Reading back, I wasn't clear: the Go SDK does Option (1), putting the LP explicitly during encoding [1] for the runner proto, and explicitly expects LPs to contain a custom coder URN on decode for execution [2]. (Modulo an old bug in Dataflow where the urn was empty)
[1] https://github.com/apache/beam/blob/4364a214dfe6d8d5dd84b1bb91d579f466492ca5/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L348 [2] https://github.com/apache/beam/blob/4364a214dfe6d8d5dd84b1bb91d579f466492ca5/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L219 On Fri, Nov 8, 2019, 10:28 AM Robert Bradshaw <rober...@google.com> wrote: > On Fri, Nov 8, 2019 at 2:09 AM jincheng sun <sunjincheng...@gmail.com> > wrote: > > > > Hi, > > > > Sorry for my late reply. It seems the conclusion has been reached. I > just want to share my personal thoughts. > > > > Generally, both option 1 and 3 make sense to me. > > > > >> The key concept here is not "standard coder" but "coder that the > > >> runner does not understand." This knowledge is only in the runner. > > >> Also has the downside of (2). > > > > >Yes, I had assumed "non-standard" and "unknown" are the same, but the > > >latter can be a subset of the former, i.e. if a Runner does not support > > >all of the standard coders for some reason. > > > > I'm also assume that "non-standard" and "unknown" are the same. > Currently, in the runner side[1] it > > decides whether the coder is unknown(wrap with length prefix coder) > according to whether the coder is among > > the standard coders. It will not communicate with harness to make this > decision. > > > > So, from my point of view, we can update the PR according to option 1 or > 3. > > > > [1] > https://github.com/apache/beam/blob/66a67e6580b93906038b31ae7070204cec90999c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L62 > > That list is populated in Java code [1] and has typically been a > subset of what is in the proto file. Things like StringUtf8Coder and > DoubleCoder have been added at different times to different SDKs and > Runners, sometimes long after the URN is in the proto. Having to keep > this list synchronized (and versioned) would be a regression. > > [1] > https://github.com/apache/beam/blob/release-2.17.0/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java > > The PR taking approach (1) looks good at a first glance (I see others > are reviewing it). Thanks. > > > Maximilian Michels <m...@apache.org> 于2019年11月8日周五 上午3:35写道: > >> > >> > While the Go SDK doesn't yet support a State API, Option 3) is what > the Go SDK does for all non-standard coders (aka custom coders) anyway. > >> > >> For wire transfer, the Java Runner also adds a LengthPrefixCoder for the > >> coder and its subcomponents. The problem is that this is an implicit > >> assumption made. In the Proto, we do not have this represented. This is > >> why **for state requests**, we end up with a > >> "LengthPrefixCoder[CustomCoder]" on the Runner and a "CustomCoder" on > >> the SDK Harness side. Note that the Python Harness does wrap unknown > >> coders in a LengthPrefixCoder for transferring regular elements, but the > >> LengthPrefixCoder is not preserved for the state requests. > >> > >> In that sense (3) is good because it follows this implicit notion of > >> adding a LengthPrefixCoder for wire transfer, but applies it to state > >> requests. > >> > >> However, option (1) is most reliable because the LengthPrefixCoder is > >> actually in the Proto. So "CustomCoder" will always be represented as > >> "LengthPrefixCoder[CustomCoder]", and only standard coders will be added > >> without a LengthPrefixCoder. > >> > >> > I'd really like to avoid implicit agreements about how the coder that > >> > should be used differs from what's specified in the proto in different > >> > contexts. > >> > >> Option (2) would work on top of the existing logic because replacing a > >> non-standard coder with a "NOOP coder" would just be used by the Runner > >> to produce a serialized version of the key for partitioning. Flink > >> always operates on the serialized key, be it standard or non-standard > >> coder. It wouldn't be necessary to change any of the existing wire > >> transfer logic or representation. I understand that it would be less > >> ideal, but maybe easier to fix for the release. > >> > >> > The key concept here is not "standard coder" but "coder that the > >> > runner does not understand." This knowledge is only in the runner. > >> > Also has the downside of (2). > >> > >> Yes, I had assumed "non-standard" and "unknown" are the same, but the > >> latter can be a subset of the former, i.e. if a Runner does not support > >> all of the standard coders for some reason. > >> > >> > This means that the wire format that the runner sends for the "key" > represents the exact same wire format it will receive for state requests. > >> > >> The wire format for the entire element is the same. Otherwise we > >> wouldn't be able to process data between the Runner and the SDK Harness. > >> However, the problem is that the way the Runner instantiates the key > >> coder to partition elements, does not match how the SDK encodes the key > >> when it sends a state request to the Runner. Conceptually, those two > >> situations should be the same, but in practice they are not. > >> > >> > >> Now that I thought about it again option (1) is probably the most > >> explicit and in that sense cleanest. However, option (3) is kind of fair > >> because it would just replicate the implicit LengthPrefixCoder behavior > >> we have for general wire transfer also for state requests. Option (2) I > >> suppose is the most implicit and runner-specific, should probably be > >> avoided in the long run. > >> > >> So I'd probably opt for (1) and I would update the PR[1] rather soon > >> because this currently blocks the release, as this is a regression from > >> 2.16.0.[2] > >> > >> > >> -Max > >> > >> [1] https://github.com/apache/beam/pull/9997 > >> [2] (In 2.16.0 it worked for Python because the Runner used a > >> ByteArrayCoder with the OUTER encoding context for the key which was > >> basically option (2). Only problem that, for standard coders the Java > >> SDK Harness produced non-matching state request keys, due to it using > >> the NESTED context.) > >> > >> On 07.11.19 18:01, Luke Cwik wrote: > >> > > >> > > >> > On Thu, Nov 7, 2019 at 8:22 AM Robert Bradshaw <rober...@google.com > >> > <mailto:rober...@google.com>> wrote: > >> > > >> > On Thu, Nov 7, 2019 at 6:26 AM Maximilian Michels <m...@apache.org > >> > <mailto:m...@apache.org>> wrote: > >> > > > >> > > Thanks for the feedback thus far. Some more comments: > >> > > > >> > > > Instead, the runner knows ahead of time that it > >> > > > will need to instantiate this coder, and should update the > bundle > >> > > > processor to specify KvCoder<LengthPrefixCoder<CustomCoder>, > >> > > > VarIntCoder> as the coder so both can pull it out in a > >> > consistent way. > >> > > > >> > > By "update the bundle processor", do you mean modifying the > >> > > ProcessBundleDescriptor's BagUserState with the correct key > coder? > >> > > Conceptually that is possible, but the current implementation > >> > does not > >> > > allow for this to happen: > >> > > > >> > > https://github.com/apache/beam/blob/076a037e53ca39b61c1dfeb580527bc8d0371dc1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java#L284 > >> > > It enforces ByteString which does not tell the SDK Harness > anything > >> > > about the desired encoding. > >> > > >> > I meant update the BundleProcessDescriptor proto that is sent to > the > >> > SDK > >> > > https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L140 > , > >> > essentially option (1). > >> > > >> > > >> > For clarity, the "key" coder is specified by the stateful ParDo's main > >> > input PCollection. This means that the ProcessBundleDescriptor should > >> > have something that has the length prefix as part of the remote grpc > >> > port specification AND the PCollection that follows it which is the > main > >> > input for the stateful ParDo. This means that the wire format that the > >> > runner sends for the "key" represents the exact same wire format it > will > >> > receive for state requests. > >> > > >> > I see what you mean Max, > >> > > https://github.com/apache/beam/blob/076a037e53ca39b61c1dfeb580527bc8d0371dc1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java#L284 > >> > could change to represent the actual coder (whether it be > >> > LengthPrefixCoder<BytesCoder> or some other model coder combination). > >> > Currently that logic assumes that the runner can perform the backwards > >> > mapping by decoding the bytestring with the appropriate coder. > >> > > >> > > Since the above does not seem feasible, I see the following > options: > >> > > > >> > > (1) Modify the pipeline Proto before the translation and wrap a > >> > > LengthPrefixCoder around non-standard key coders for stateful > >> > > transforms. This would change the encoding for the entire > >> > element, to be > >> > > sure that the key coder for state requests contains a > >> > LengthPrefixCoder > >> > > for state requests from the SDK Harness. Not optimal. > >> > > >> > Yes. The contract should be that both the runner and SDK use the > >> > coders that are specified in the proto. The runner controls the > proto, > >> > and should ensure it only sends protos it will be able to handle > the > >> > SDK responding to. I'm not seeing why this is sub-optimal. > >> > > >> > > (2) Add a new method WireCoders#instantiateRunnerWireKeyCoder > which > >> > > returns the correct key coder, i.e. for standard coders, the > concrete > >> > > coder, and for non-standard coders a ByteArrayCoder. We also > need to > >> > > ensure the key encoding on the Runner side is OUTER context, > to avoid > >> > > adding a length prefix to the encoded bytes. Basically, the > >> > non-standard > >> > > coders result in a NOOP coder which does not touch the key > bytes. > >> > > >> > I'd really like to avoid implicit agreements about how the coder > that > >> > should be used differs from what's specified in the proto in > different > >> > contexts. > >> > > >> > > (3) Patch the Python SDK to ensure non-standard state key > coders are > >> > > always wrapped in a LengthPrefixCoder. That way, we can keep > the > >> > > existing logic on the Runner side. > >> > > >> > The key concept here is not "standard coder" but "coder that the > >> > runner does not understand." This knowledge is only in the runner. > >> > Also has the downside of (2). > >> > > >> > > Option (2) seems like the most practical. > >> > > > >> > > -Max > >> > > > >> > > On 06.11.19 17:26, Robert Bradshaw wrote: > >> > > > On Wed, Nov 6, 2019 at 2:55 AM Maximilian Michels > >> > <m...@apache.org <mailto:m...@apache.org>> wrote: > >> > > >> > >> > > >> Let me try to clarify: > >> > > >> > >> > > >>> The Coder used for State/Timers in a StatefulDoFn is pulled > >> > out of the > >> > > >>> input PCollection. If a Runner needs to partition by this > >> > coder, it > >> > > >>> should ensure the coder of this PCollection matches with > the > >> > Coder > >> > > >>> used to create the serialized bytes that are used for > >> > partitioning > >> > > >>> (whether or not this is length-prefixed). > >> > > >> > >> > > >> That is essentially what I had assumed when I wrote the > code. The > >> > > >> problem is the coder can be "pulled out" in different ways. > >> > > >> > >> > > >> For example, let's say we have the following Proto > PCollection > >> > coder > >> > > >> with non-standard coder "CustomCoder" as the key coder: > >> > > >> > >> > > >> KvCoder<CustomCoder, VarIntCoder> > >> > > >> > >> > > >> From the Runner side, this currently looks like the > following: > >> > > >> > >> > > >> PCol: KvCoder<LengthPrefixCoder<ByteArrayCoder>, > VarIntCoder> > >> > > >> Key: LengthPrefixCoder<ByteArrayCoder> > >> > > > > >> > > > This is I think where the error is. When If the proto > references > >> > > > KvCoder<CustomCoder, VarIntCoder> it should not be pulled > out as > >> > > > KvCoder<LengthPrefixCoder<ByteArrayCoder>, VarIntCoder>; as > that > >> > > > doesn't have the same encoding. Trying to do instantiate > such a > >> > coder > >> > > > should be an error. Instead, the runner knows ahead of time > that it > >> > > > will need to instantiate this coder, and should update the > bundle > >> > > > processor to specify KvCoder<LengthPrefixCoder<CustomCoder>, > >> > > > VarIntCoder> as the coder so both can pull it out in a > >> > consistent way. > >> > > > > >> > > > When the coder is KvCoder<LengthPrefixCoder<CustomCoder>, > >> > VarIntCoder> > >> > > > instantiating it as KvCoder<ByteArrayCoder, VarIntCoder> on > the > >> > runner > >> > > > is of course OK as they do have the same encoding. > >> > > > > >> > > >> At the SDK Harness, we have the coder available: > >> > > >> > >> > > >> PCol: KvCoder<CustomCoder, VarIntCoder> > >> > > >> Key: CustomCoder > >> > > >> > >> > > >> Currently, when the SDK Harness serializes a key for a state > >> > request, > >> > > >> the custom coder may happen to add a length prefix, or it > may > >> > not. It > >> > > >> depends on the coder used. The correct behavior would be to > >> > use the same > >> > > >> representation as on the Runner side. > >> > > >> > >> > > >>> Specifically, "We have no way of telling from the Runner > >> > side, if a length prefix has been used or not." seems false > >> > > >> > >> > > >> The Runner cannot inspect an unknown coder, it only has the > >> > opaque Proto > >> > > >> information available which does not allow introspection of > >> > non-standard > >> > > >> coders. With the current state, the Runner may think the > coder > >> > adds a > >> > > >> length prefix but the Python SDK worker could choose to add > >> > none. This > >> > > >> produces an inconsistent key encoding. See above. > >> > > > > >> > > > I think what's being conflated here is "the Coder has been > >> > wrapped in > >> > > > a LengthPrefixCoder" vs. "the coder does length prefixing." > >> > These are > >> > > > two orthogonal concepts. The runner in general only knows the > >> > former. > >> > > > > >> > > >> It looks like the key encoding for state requests on the > >> > Python SDK > >> > > >> Harness side is broken. For transferring elements of a > >> > PCollection, the > >> > > >> coders are obviously working correctly, but for encoding > >> > solely the key > >> > > >> of an element, there is a consistency issue. > >> > > >> > >> > > >> > >> > > >> -Max > >> > > >> > >> > > >> On 06.11.19 05:35, Kenneth Knowles wrote: > >> > > >>> Specifically, "We have no way of telling from the Runner > >> > side, if a > >> > > >>> length prefix has been used or not." seems false. The > runner > >> > has all the > >> > > >>> information since length prefix is a model coder. Didn't we > >> > agree that > >> > > >>> all coders should be self-delimiting in runner/SDK > interactions, > >> > > >>> requiring length-prefix only when there is an opaque or > >> > dynamic-length > >> > > >>> value? I assume you mean that at runtime the worker for a > >> > given engine > >> > > >>> does not know? > >> > > >>> > >> > > >>> Kenn > >> > > >>> > >> > > >>> On Tue, Nov 5, 2019 at 3:19 PM Luke Cwik <lc...@google.com > >> > <mailto:lc...@google.com> > >> > > >>> <mailto:lc...@google.com <mailto:lc...@google.com>>> > wrote: > >> > > >>> > >> > > >>> +1 to what Robert said. > >> > > >>> > >> > > >>> On Tue, Nov 5, 2019 at 2:36 PM Robert Bradshaw > >> > <rober...@google.com <mailto:rober...@google.com> > >> > > >>> <mailto:rober...@google.com > >> > <mailto:rober...@google.com>>> wrote: > >> > > >>> > >> > > >>> The Coder used for State/Timers in a StatefulDoFn > is > >> > pulled out > >> > > >>> of the > >> > > >>> input PCollection. If a Runner needs to partition > by > >> > this coder, it > >> > > >>> should ensure the coder of this PCollection > matches > >> > with the Coder > >> > > >>> used to create the serialized bytes that are used > >> > for partitioning > >> > > >>> (whether or not this is length-prefixed). > >> > > >>> > >> > > >>> Concretely, the graph looks like > >> > > >>> > >> > > >>> > >> > > >>> Runner SDK Harness > >> > > >>> > >> > > >>> WriteToGbk > >> > > >>> | > >> > > >>> ReadFromGbk > >> > > >>> | > >> > > >>> RunnerMapFn.processKeyValue(key, value) > >> > > >>> | > >> > > >>> WriteToDataChannel > >> > > >>> ------------------------> > >> > > >>> ReadFromDataChannel > >> > > >>> | > >> > > >>> (pcIn) > >> > > >>> | > >> > > >>> MyStatefulDoFn.process(key, > value) > >> > > >>> > >> > > >>> Now the (key part of the) Coder of pcIn, which > comes > >> > from the proto > >> > > >>> that the Runner sent to the SDK, must match the > (key > >> > part of the) > >> > > >>> encoding used in WriteToGbk and ReadFromGbk. If a > >> > LenthPrefix is > >> > > >>> added > >> > > >>> in one spot, it must be added in the other. > >> > > >>> > >> > > >>> > >> > > >>> [1] > >> > > >>> > >> > > https://github.com/apache/beam/blob/release-2.17.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L1183 > >> > > >>> > >> > > >>> On Tue, Nov 5, 2019 at 1:25 PM Maximilian Michels > >> > > >>> <m...@apache.org <mailto:m...@apache.org> > >> > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote: > >> > > >>> > > >> > > >>> > Hi, > >> > > >>> > > >> > > >>> > I wanted to get your opinion on something that > I > >> > have been > >> > > >>> struggling > >> > > >>> > with. It is about the coders for state requests > >> > in portable > >> > > >>> pipelines. > >> > > >>> > > >> > > >>> > In contrast to "classic" Beam, the Runner is > not > >> > guaranteed > >> > > >>> to know > >> > > >>> > which coder is used by the SDK. If the SDK > >> > happens to use a > >> > > >>> standard > >> > > >>> > coder (also known as model coder), we will also > >> > have it > >> > > >>> available at the > >> > > >>> > Runner, i.e. if the Runner is written in one of > >> > the SDK > >> > > >>> languages (e.g. > >> > > >>> > Java). However, when we do not have a standard > >> > coder, we just > >> > > >>> treat the > >> > > >>> > data from the SDK as a blob and just pass it > >> > around as bytes. > >> > > >>> > > >> > > >>> > Problem > >> > > >>> > ======= > >> > > >>> > > >> > > >>> > In the case of state requests which the SDK > >> > Harness authors > >> > > >>> to the > >> > > >>> > Runner, we would like for the key associated > with > >> > the state > >> > > >>> request to > >> > > >>> > match the key of the element which led to > >> > initiating the > >> > > >>> state request. > >> > > >>> > > >> > > >>> > Example: > >> > > >>> > > >> > > >>> > Runner SDK Harness > >> > > >>> > ------ ----------- > >> > > >>> > > >> > > >>> > KV["key","value"] --> Process Element > >> > > >>> > | > >> > > >>> > LookupState("key") <-- Request state of "key" > >> > > >>> > | > >> > > >>> > State["key"] --> Receive state > >> > > >>> > > >> > > >>> > > >> > > >>> > For stateful DoFns, the Runner partitions the > >> > data based on > >> > > >>> the key. In > >> > > >>> > Flink, this partitioning must not change during > >> > the lifetime of a > >> > > >>> > pipeline because the checkpointing otherwise > >> > breaks[0]. The > >> > > >>> key is > >> > > >>> > extracted from the element and stored encoded. > >> > > >>> > > >> > > >>> > If we have a standard coder, it is basically > the > >> > same as in the > >> > > >>> > "classic" Runner which takes the key and > >> > serializes it. > >> > > >>> However, when we > >> > > >>> > have an SDK-specific coder, we basically do not > >> > know how it > >> > > >>> encodes. So > >> > > >>> > far, we have been using the coder instantiated > >> > from the > >> > > >>> Proto, which is > >> > > >>> > basically a LengthPrefixCoder[ByteArrayCoder] > or > >> > similar[1]. > >> > > >>> We have had > >> > > >>> > problems with this because the key encoding of > >> > Java SDK state > >> > > >>> requests > >> > > >>> > did not match the key encoding on the Runner > side > >> > [2]. In an > >> > > >>> attempt to > >> > > >>> > fix those, it is now partly broken for portable > >> > Python pipelines. > >> > > >>> > Partly, because it "only" affects non-standard > >> > coders. > >> > > >>> > > >> > > >>> > Non-standard coders yield the aforementioned > >> > > >>> > LengthPrefixCoder[ByteArrayCoder]. Now, > following > >> > the usual > >> > > >>> encoding > >> > > >>> > scheme, we would simply encode the key using > this > >> > coder. > >> > > >>> However, for > >> > > >>> > state requests, the Python SDK leaves out the > >> > length prefix > >> > > >>> for certain > >> > > >>> > coders, e.g. for primitives like int or byte. > It > >> > is possible > >> > > >>> that one > >> > > >>> > coder uses a length prefix, while another > >> > doesn't. We have no > >> > > >>> way of > >> > > >>> > telling from the Runner side, if a length > prefix > >> > has been > >> > > >>> used or not. > >> > > >>> > This results in the keys to not match on the > >> > Runner side and the > >> > > >>> > partitioning to be broken. > >> > > >>> > > >> > > >>> > > >> > > >>> > How to solve this? > >> > > >>> > ================== > >> > > >>> > > >> > > >>> > (1) Should this simply be fixed on the Python > SDK > >> > side? One > >> > > >>> fix would be > >> > > >>> > to always append a length prefix to the key in > state > >> > > >>> requests, even for > >> > > >>> > primitive coders like VarInt which do not use > one. > >> > > >>> > > >> > > >>> > OR > >> > > >>> > > >> > > >>> > (2) Should the Runner detect that a > non-standard > >> > coder is > >> > > >>> used? If so, > >> > > >>> > it should just pass the bytes from the SDK > >> > Harness and never > >> > > >>> make an > >> > > >>> > attempt to construct a coder based on the > Proto. > >> > > >>> > > >> > > >>> > > >> > > >>> > Thinking about it now, it seems pretty obvious > >> > that (2) is > >> > > >>> the most > >> > > >>> > feasible way to avoid complications across all > >> > current and > >> > > >>> future SDKs > >> > > >>> > for key encodings. Still, it is odd that the > >> > Proto contains coder > >> > > >>> > information which is not usable. > >> > > >>> > > >> > > >>> > What do you think? > >> > > >>> > > >> > > >>> > > >> > > >>> > Thanks, > >> > > >>> > Max > >> > > >>> > > >> > > >>> > > >> > > >>> > [0] It is possible to restart the pipeline and > >> > repartition the > >> > > >>> > checkpointed data. > >> > > >>> > [1] > >> > > >>> > > >> > > >>> > >> > > https://github.com/apache/beam/blob/c39752af5391fe698a2b4f1489c187ddd4d604c0/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L682 > >> > > >>> > [2] > https://issues.apache.org/jira/browse/BEAM-8157 > >> > > >>> > >> > >