For Python and Java SDKs, SDK object -> Beam proto conversion and SDK object -> Dataflow job request conversion were developed independently and both were subsequently updated to support x-lang.
AFAIK SDK object -> Beam proto conversion was developed when we first added support for Beam runner API protos to the SDK. In Python SDK we basically iteratively invoke to_runner_api() implementations of various SDK graph objects to build the full Beam pipeline proto. To support x-lang we updated to_runner_api() method of ExternalTransform (implemented in to_runner_api_transform for transforms) to attach the pipeline segment received from the external SDK during this process. https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py#L386 This has been working fairly well but Brian recently found an issue related to unique ID collisions: https://github.com/apache/beam/pull/12067 Luke, do you think there's a fundamental issue with the above that we have to fix in addition to direct Beam pipeline proto to Dataflow proto conversion that we are already working on ? Note that this particular piece of code is used by all runners when using x-lang. Thanks, Cham On Thu, Jul 2, 2020 at 9:04 AM Robert Bradshaw <[email protected]> wrote: > I agree this has been a major source of pain. The primary cause of issues > is the conversion from Beam protos back to the SDK objects (which doesn't > always have a good representation, especially for foreign-language > components). In my experience, the SDK objects -> Beam Proto conversions > aren't generally a problem (except sometimes for objects that were formerly > converted from protos). > > In my opinion, the solution is to convert to Beam protos and never convert > back. (Well, not until we get to the workers, but at that point we can > confidently say everything we need to decode does actually belong to the > ambient environment.) As mentioned, Dataflow is being fixed, and the only > other runner (in Python) that doesn't consume the Beam protos directly is > the old direct runner (which Pablo is working on making obsolete, and > doesn't support cross language anyway). So finally fixing dataflow should > be all we need to do. (Have we seen these issues on other runners?) > > On the Java side, I think all the optimization stuff works on its SDK > representation so care needs to be done to make that conversion faithful or > convert that code to act on the protos directly. > > As for why we went with the current approach, it's simply the fact that > SDK representation -> Dataflow v1beta3 predated any of the beam protos > stuff, and re-using that code seemed easier than updating the Dataflow > service to accept Beam protos (or re-writing it) as we had the SDK > representation for everything in hand (until cross-langauge came along that > is). > > > On Thu, Jul 2, 2020 at 3:11 AM Heejong Lee <[email protected]> wrote: > >> >> >> On Wed, Jul 1, 2020 at 7:18 PM Robert Burke <[email protected]> wrote: >> >>> From the Go SDK side, it was built that way nearly from the start. >>> Historically there was a direct SDK rep -> Dataflow rep conversion, but >>> that's been replaced with a SDK rep -> Beam Proto -> Dataflow rep >>> conversion. >>> >>> In particular, this approach had a few benefits: easier to access local >>> context for pipeline validation at construction time, to permit as early a >>> failure as possible, which might be easier with native language constructs >>> vs beam representations of them.(Eg. DoFns not matching ParDo & Collection >>> types, and similar) >>> Protos are convenient, but impose certain structure on how the pipeline >>> graph is handled. (This isn't to say an earlier conversion isn't possible, >>> one can do almost anything in code, but it lets the structure be optimised >>> for this case.) >>> >>> The big advantage of translating from Beam proto -> to Dataflow Rep is >>> that the Dataflow Rep can get the various unique IDs that are mandated for >>> the Beam proto process. >>> >>> However, the same can't really be said for the other way around. A good >>> question is "when should the unique IDs be assigned?" >>> >> >> This is very true and I would like to elaborate more on the source of >> friction when using external transforms. As Robert mentioned, pipeline >> proto refers to each component by unique IDs and the unique ID is only >> assigned when we convert SDK pipeline object to pipeline proto. Before >> XLang, pipeline object to pipeline proto conversion happened one time >> during the job submission phase. However, after XLang transform was >> introduced, it also happens when we request expansion of external >> transforms to the expansion service. Unique ID generated for the expansion >> request can be embedded in the returning external proto and conflicted >> later with other unique IDs generated for the job submission. >> >> >>> >>> While I'm not working on adding XLang to the Go SDK directly (that would >>> be our wonderful intern, Kevin), I've kind of pictured that the process >>> was to provide the Expansion service with unique placeholders if unable to >>> provide the right IDs, and substitute them in returned pipeline graph >>> segment afterwards, once that is known. That is, we can be relatively >>> certain that the expansion service will be self consistent, but it's the >>> SDK requesting the expansion's responsibility to ensure they aren't >>> colliding with the primary SDKs pipeline ids. >>> >> >> AFAIK, we're already doing this in Java and Python SDKs. Not providing a >> "placeholder" but remembering which pipeline object maps to which unique ID >> used in the expanded component proto. >> >> >>> >>> Otherwise, we could probably recommend a translation protocol (if one >>> doesn't exist already, it probably does) and when XLang expansions are to >>> happen in the SDK -> beam proto process. So something like Pass 1, intern >>> all coders and Pcollections, Pass 2 intern all DoFns and environments, Pass >>> 3 expand Xlang, ... Etc. >>> >> >> Not sure I understand correctly but a following transform who consumes >> the output of an external transform needs some information like the output >> pcollection information from the expanded external transform during the >> pipeline construction phase. >> >> >>> The other half of this is when happens when Going from Beam proto a >>> -> SDK? This happens during pipeline execution, but at least in the Go >>> SDK partly happens when creating the Dataflow rep. In particular, Coder >>> reference values only have a populated ID when they've been "rehydrated" >>> from the Beam proto, since the Beam Proto is the first place where such IDs >>> are correctly assigned. >>> >>> Tl;dr; i think the right question to sort out is when should IDs be >>> expected to be assigned and available during pipeline construction. >>> >>> On Wed, Jul 1, 2020, 6:34 PM Luke Cwik <[email protected]> wrote: >>> >>>> It seems like we keep running into translation issues with XLang due to >>>> how it is represented in the SDK. (e.g. Brian's work on context map due to >>>> loss of coder ids, Heejong's work related to missing environment ids on >>>> windowing strategies). >>>> >>>> I understand that there is an effort that is Dataflow specific where >>>> the conversion of the Beam proto -> Dataflow API (v1b3) will help with some >>>> issues but it still requires the SDK pipeline representation -> Beam proto >>>> to occur correctly which won't be fixed by the Dataflow specific effort. >>>> >>>> Why did we go with the current approach? >>>> >>>> What other ways could we do this? >>>> >>>
