[ https://issues.apache.org/jira/browse/BEAM-3203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kenneth Knowles updated BEAM-3203: ---------------------------------- Labels: portability triaged (was: portability) > Spec out what mutations are allowed to a constructed model pipeline, > particularly coders > ---------------------------------------------------------------------------------------- > > Key: BEAM-3203 > URL: https://issues.apache.org/jira/browse/BEAM-3203 > Project: Beam > Issue Type: Improvement > Components: beam-model > Reporter: Kenneth Knowles > Priority: Major > Labels: portability, triaged > > Context: presume an SDK has constructed a pipeline or sub-pipeline, and sent > it - as a model proto - to another party, which could be a runner or another > SDK. > Question to be resolved: What mutations are allowed to this pipeline? > For example, depending on how an SDK harness is implemented, some coders (aka > wire formats) can be swapped while leaving the language-level types > compatible. For example, "urn:beam:coder:varlong" and > "urn:beam:coder:bigendianlong". It may also be possible to add or remove > added length prefixes in some situations. > What we mean by _coder_ is a wire format specification for a _stream_ of > elements, specified by a {{FunctionSpec}} proto and its components coders > (and so on recursively). > For many coders, if the encoding is not known to a party, then the boundaries > of elements cannot be discerned. But there are lots of situations where > boundaries need to be known without full decoding - particularly by runners, > but also at some point for SDK-to-SDK transmission. > *Possibility 1*: insist that a coder... > {code} > Coder { > spec: FunctionSpec { urn: "beam:coder:my_whatever_coder" } > } > {code} > ... is always allowed to be replaced by the same coder, wrapped with an added > lengh prefix ... > {code} > Coder { > spec: FunctionSpec { urn: "beam:coder:add_length_prefix" } > component_coders: [ > Coder { > spec: FunctionSpec { urn: "beam:coder:my_whatever_coder" } > } > ] > } > {code} > There is a responsibility that each SDK harness understand this coder and > also be able to execute the same UDFs with the decoded values. This is > already sort of implicit in how the Fn API produces ProcessBundleDescriptors, > since a runner can never assume to understand SDK coders. > *Posibility 2*: allow optimization by indicating a way to determine element > boundaries > It may be that even for a coder that cannot be understood, the element > boundaries can be easily discerned. For example, if a coder _already_ puts a > length prefix in a known format at the start of each element, you just need > to pull that out. This means that for an unknown coder, you can save the > computation and space of adding a length prefix. (if you can understand > "urn:beam:coder:add_length_prefix" then that special case is already handled) > It might look something like this: > {code} > Coder { > spec: FunctionSpec { urn: "beam:coder:my_whatever_coder" } > also_decodes_as: Coder { > spec: FunctionSpec { urn: "beam:coder:add_length_prefix" } > component_coders: [ > Coder: { urn: "beam:coder:uninterpretable_bytes" } > ] > } > } > {code} > The extra coder in {{also_decodes_as}} must be completely wire-compatible and > should always be compose of completely standardized coders, so element > boundaries can always be ascertained. An annoyance here is the possibility > for silly protos where this recurses. Since the main implementation we expect > is a length prefix, it could just be a flag, or just a coder for the length > prefix itself. -- This message was sent by Atlassian JIRA (v7.6.3#76005)