For Python and Java SDKs, SDK object -> Beam proto conversion and SDK
object -> Dataflow job request conversion were developed independently and
both were subsequently updated to support x-lang.

AFAIK SDK object -> Beam proto conversion was developed when we first added
support for Beam runner API protos to the SDK. In Python SDK we basically
iteratively invoke to_runner_api() implementations of  various SDK graph
objects to build the full Beam pipeline proto. To support x-lang we updated
to_runner_api() method of ExternalTransform (implemented
in to_runner_api_transform for transforms) to attach the pipeline segment
received from the external SDK during this process.
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py#L386
This has been working fairly well but Brian recently found an issue related
to unique ID collisions: https://github.com/apache/beam/pull/12067

Luke, do you think there's a fundamental issue with the above that we have
to fix in addition to direct Beam pipeline proto to Dataflow proto
conversion that we are already working on ? Note that this particular piece
of code is used by all runners when using x-lang.

Thanks,
Cham


On Thu, Jul 2, 2020 at 9:04 AM Robert Bradshaw <[email protected]> wrote:

> I agree this has been a major source of pain. The primary cause of issues
> is the conversion from Beam protos back to the SDK objects (which doesn't
> always have a good representation, especially for foreign-language
> components). In my experience, the SDK objects -> Beam Proto conversions
> aren't generally a problem (except sometimes for objects that were formerly
> converted from protos).
>
> In my opinion, the solution is to convert to Beam protos and never convert
> back. (Well, not until we get to the workers, but at that point we can
> confidently say everything we need to decode does actually belong to the
> ambient environment.) As mentioned, Dataflow is being fixed, and the only
> other runner (in Python) that doesn't consume the Beam protos directly is
> the old direct runner (which Pablo is working on making obsolete, and
> doesn't support cross language anyway). So finally fixing dataflow should
> be all we need to do. (Have we seen these issues on other runners?)
>
> On the Java side, I think all the optimization stuff works on its SDK
> representation so care needs to be done to make that conversion faithful or
> convert that code to act on the protos directly.
>
> As for why we went with the current approach, it's simply the fact that
> SDK representation -> Dataflow v1beta3 predated any of the beam protos
> stuff, and re-using that code seemed easier than updating the Dataflow
> service to accept Beam protos (or re-writing it) as we had the SDK
> representation for everything in hand (until cross-langauge came along that
> is).
>
>
> On Thu, Jul 2, 2020 at 3:11 AM Heejong Lee <[email protected]> wrote:
>
>>
>>
>> On Wed, Jul 1, 2020 at 7:18 PM Robert Burke <[email protected]> wrote:
>>
>>> From the Go SDK side, it was built that way nearly from the start.
>>> Historically there was a direct SDK rep -> Dataflow rep conversion, but
>>> that's been replaced with a SDK rep -> Beam Proto -> Dataflow rep
>>> conversion.
>>>
>>> In particular, this approach had a few benefits: easier to access local
>>> context for pipeline validation at construction time, to permit as early a
>>> failure as possible, which might be easier with native language constructs
>>> vs beam representations of them.(Eg. DoFns not matching ParDo & Collection
>>> types, and similar)
>>> Protos are convenient, but impose certain structure on how the pipeline
>>> graph is handled. (This isn't to say an earlier conversion isn't possible,
>>> one can do almost anything in code, but it lets the structure be optimised
>>> for this case.)
>>>
>>> The big advantage of translating from Beam proto -> to Dataflow Rep is
>>> that the Dataflow Rep can get the various unique IDs that are mandated for
>>> the Beam proto process.
>>>
>>> However, the same can't really be said for the other way around.  A good
>>> question is "when should the unique IDs be assigned?"
>>>
>>
>> This is very true and I would like to elaborate more on the source of
>> friction when using external transforms. As Robert mentioned, pipeline
>> proto refers to each component by unique IDs and the unique ID is only
>> assigned when we convert SDK pipeline object to pipeline proto. Before
>> XLang, pipeline object to pipeline proto conversion happened one time
>> during the job submission phase. However, after XLang transform was
>> introduced, it also happens when we request expansion of external
>> transforms to the expansion service. Unique ID generated for the expansion
>> request can be embedded in the returning external proto and conflicted
>> later with other unique IDs generated for the job submission.
>>
>>
>>>
>>> While I'm not working on adding XLang to the Go SDK directly (that would
>>> be our wonderful intern, Kevin),  I've kind of pictured that the process
>>> was to provide the Expansion service with unique placeholders if unable to
>>> provide the right IDs, and substitute them in returned pipeline graph
>>> segment afterwards, once that is known. That is, we can be relatively
>>> certain that the expansion service will be self consistent, but it's the
>>> SDK requesting the expansion's responsibility to ensure they aren't
>>> colliding with the primary SDKs pipeline ids.
>>>
>>
>> AFAIK, we're already doing this in Java and Python SDKs. Not providing a
>> "placeholder" but remembering which pipeline object maps to which unique ID
>> used in the expanded component proto.
>>
>>
>>>
>>> Otherwise, we could probably recommend a translation protocol (if one
>>> doesn't exist already, it probably does) and when XLang expansions are to
>>> happen in the SDK -> beam proto process. So something like Pass 1, intern
>>> all coders and Pcollections, Pass 2 intern all DoFns and environments, Pass
>>> 3 expand Xlang, ... Etc.
>>>
>>
>> Not sure I understand correctly but a following transform who consumes
>> the output of an external transform needs some information like the output
>> pcollection information from the expanded external transform during the
>> pipeline construction phase.
>>
>>
>>> The other half of this is when happens when Going from Beam proto a
>>> -> SDK? This happens during pipeline execution, but at least in the Go
>>> SDK partly happens when creating the Dataflow rep. In particular, Coder
>>> reference values only have a populated ID when they've been "rehydrated"
>>> from the Beam proto, since the Beam Proto is the first place where such IDs
>>> are correctly assigned.
>>>
>>> Tl;dr; i think the right question to sort out is when should IDs be
>>> expected to be assigned and available during pipeline construction.
>>>
>>> On Wed, Jul 1, 2020, 6:34 PM Luke Cwik <[email protected]> wrote:
>>>
>>>> It seems like we keep running into translation issues with XLang due to
>>>> how it is represented in the SDK. (e.g. Brian's work on context map due to
>>>> loss of coder ids, Heejong's work related to missing environment ids on
>>>> windowing strategies).
>>>>
>>>> I understand that there is an effort that is Dataflow specific where
>>>> the conversion of the Beam proto -> Dataflow API (v1b3) will help with some
>>>> issues but it still requires the SDK pipeline representation -> Beam proto
>>>> to occur correctly which won't be fixed by the Dataflow specific effort.
>>>>
>>>> Why did we go with the current approach?
>>>>
>>>> What other ways could we do this?
>>>>
>>>

Reply via email to