Thanks for sharing your thoughts which give me more help to deep understanding the design of FnAPI, and It make more sense to me.
Great thanks Robert ! Best, Jincheng Robert Bradshaw <rober...@google.com> 于2019年11月12日周二 上午2:10写道: > On Fri, Nov 8, 2019 at 10:04 PM jincheng sun <sunjincheng...@gmail.com> > wrote: > > > > > Let us first define what are "standard coders". Usually it should be > the coders defined in the Proto. However, personally I think the coders > defined in the Java ModelCoders [1] seems more appropriate. The reason is > that for a coder which has already appeared in Proto and still not added to > the Java ModelCoders, it's always replaced by the runner with > LengthPrefixCoder[ByteArrayCoder] and so the SDK harness will decode the > data with LengthPrefixCoder[ByteArrayCoder] instead of the actual coder. > That's to say, that coder does not still take effect in the SDK harness. > Only when the coder is added in ModelCoders, it's 'known' and will take > effect. > > > > Correct this point! The coder which is not contained in the Java > ModelCoders is replaced with LengthPrefixCoder[ByteArrayCoder] at runner > side and LengthPrefixCoder[CustomCoder] at SDK harness side. > > > > The point here is that the runner determines whether it knows the coder > according to the coders defined in the Java ModelCoders, not the coders > defined in the proto file. So if taking option 3, the non-standard coders > which will be wrapped with LengthPrefixCoder should also be determined by > the coders defined in the Java ModerCoders, not the coders defined in the > proto file. > > Yes. > > Both as a matter of principle and pragmatics, it'd be good to avoid > anything about the model only defined in Java files. > > Also, when we say "the runner" we cannot assume it's written in Java. > While many Java OSS runners share these libraries, The Universal Local > Runner is written in Python. Dataflow is written (primarily) in C++. > My hope is that the FnAPI will be stable enough that one can even run > multiple versions of the Java SDK with the same runner. What matters > is that (1) if the same URN is used, all runners/SDKs agree on the > encoding (2) there are certain coders (Windowed, LengthPrefixed, and > KV come to mind) that all Runners/SDKs are required to understand, and > (3) runners properly coerce coders they do not understand into coders > that they do if they need to pull out and act on the bytes. The more > coders the runner/SDK understands, the less often it needs to do this. > > > jincheng sun <sunjincheng...@gmail.com>于2019年11月9日 周六12:26写道: > >> > >> Hi Robert Bradshaw, > >> > >> Thanks a lot for the explanation. Very interesting topic! > >> > >> Let us first define what are "standard coders". Usually it should be > the coders defined in the Proto. However, personally I think the coders > defined in the Java ModelCoders [1] seems more appropriate. The reason is > that for a coder which has already appeared in Proto and still not added to > the Java ModelCoders, it's always replaced by the runner with > LengthPrefixCoder[ByteArrayCoder] and so the SDK harness will decode the > data with LengthPrefixCoder[ByteArrayCoder] instead of the actual coder. > That's to say, that coder does not still take effect in the SDK harness. > Only when the coder is added in ModelCoders, it's 'known' and will take > effect. > >> > >> So if we take option 3, the non-standard coders which will be wrapped > with LengthPrefixCoder should be synced with the coders defined in the Java > ModerCoders. (From this point of view, option 1 seems more clean!) > >> > >> Please correct me if I missed something. Thanks a lot! > >> > >> Best, > >> Jincheng > >> > >> [1] > https://github.com/apache/beam/blob/01726e9c62313749f9ea7c93063a1178abd1a8db/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java#L59 > >> > >> Robert Burke <rob...@frantil.com> 于2019年11月9日周六 上午8:46写道: > >>> > >>> And by "I wasn't clear" I meant "I misread the options". > >>> > >>> On Fri, Nov 8, 2019, 4:14 PM Robert Burke <rob...@frantil.com> wrote: > >>>> > >>>> Reading back, I wasn't clear: the Go SDK does Option (1), putting the > LP explicitly during encoding [1] for the runner proto, and explicitly > expects LPs to contain a custom coder URN on decode for execution [2]. > (Modulo an old bug in Dataflow where the urn was empty) > >>>> > >>>> > >>>> [1] > https://github.com/apache/beam/blob/4364a214dfe6d8d5dd84b1bb91d579f466492ca5/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L348 > >>>> [2] > https://github.com/apache/beam/blob/4364a214dfe6d8d5dd84b1bb91d579f466492ca5/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L219 > >>>> > >>>> > >>>> On Fri, Nov 8, 2019, 10:28 AM Robert Bradshaw <rober...@google.com> > wrote: > >>>>> > >>>>> On Fri, Nov 8, 2019 at 2:09 AM jincheng sun < > sunjincheng...@gmail.com> wrote: > >>>>> > > >>>>> > Hi, > >>>>> > > >>>>> > Sorry for my late reply. It seems the conclusion has been reached. > I just want to share my personal thoughts. > >>>>> > > >>>>> > Generally, both option 1 and 3 make sense to me. > >>>>> > > >>>>> > >> The key concept here is not "standard coder" but "coder that the > >>>>> > >> runner does not understand." This knowledge is only in the > runner. > >>>>> > >> Also has the downside of (2). > >>>>> > > >>>>> > >Yes, I had assumed "non-standard" and "unknown" are the same, but > the > >>>>> > >latter can be a subset of the former, i.e. if a Runner does not > support > >>>>> > >all of the standard coders for some reason. > >>>>> > > >>>>> > I'm also assume that "non-standard" and "unknown" are the same. > Currently, in the runner side[1] it > >>>>> > decides whether the coder is unknown(wrap with length prefix > coder) according to whether the coder is among > >>>>> > the standard coders. It will not communicate with harness to make > this decision. > >>>>> > > >>>>> > So, from my point of view, we can update the PR according to > option 1 or 3. > >>>>> > > >>>>> > [1] > https://github.com/apache/beam/blob/66a67e6580b93906038b31ae7070204cec90999c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L62 > >>>>> > >>>>> That list is populated in Java code [1] and has typically been a > >>>>> subset of what is in the proto file. Things like StringUtf8Coder and > >>>>> DoubleCoder have been added at different times to different SDKs and > >>>>> Runners, sometimes long after the URN is in the proto. Having to keep > >>>>> this list synchronized (and versioned) would be a regression. > >>>>> > >>>>> [1] > https://github.com/apache/beam/blob/release-2.17.0/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java > >>>>> > >>>>> The PR taking approach (1) looks good at a first glance (I see others > >>>>> are reviewing it). Thanks. > >>>>> > >>>>> > Maximilian Michels <m...@apache.org> 于2019年11月8日周五 上午3:35写道: > >>>>> >> > >>>>> >> > While the Go SDK doesn't yet support a State API, Option 3) is > what the Go SDK does for all non-standard coders (aka custom coders) anyway. > >>>>> >> > >>>>> >> For wire transfer, the Java Runner also adds a LengthPrefixCoder > for the > >>>>> >> coder and its subcomponents. The problem is that this is an > implicit > >>>>> >> assumption made. In the Proto, we do not have this represented. > This is > >>>>> >> why **for state requests**, we end up with a > >>>>> >> "LengthPrefixCoder[CustomCoder]" on the Runner and a > "CustomCoder" on > >>>>> >> the SDK Harness side. Note that the Python Harness does wrap > unknown > >>>>> >> coders in a LengthPrefixCoder for transferring regular elements, > but the > >>>>> >> LengthPrefixCoder is not preserved for the state requests. > >>>>> >> > >>>>> >> In that sense (3) is good because it follows this implicit notion > of > >>>>> >> adding a LengthPrefixCoder for wire transfer, but applies it to > state > >>>>> >> requests. > >>>>> >> > >>>>> >> However, option (1) is most reliable because the > LengthPrefixCoder is > >>>>> >> actually in the Proto. So "CustomCoder" will always be > represented as > >>>>> >> "LengthPrefixCoder[CustomCoder]", and only standard coders will > be added > >>>>> >> without a LengthPrefixCoder. > >>>>> >> > >>>>> >> > I'd really like to avoid implicit agreements about how the > coder that > >>>>> >> > should be used differs from what's specified in the proto in > different > >>>>> >> > contexts. > >>>>> >> > >>>>> >> Option (2) would work on top of the existing logic because > replacing a > >>>>> >> non-standard coder with a "NOOP coder" would just be used by the > Runner > >>>>> >> to produce a serialized version of the key for partitioning. Flink > >>>>> >> always operates on the serialized key, be it standard or > non-standard > >>>>> >> coder. It wouldn't be necessary to change any of the existing wire > >>>>> >> transfer logic or representation. I understand that it would be > less > >>>>> >> ideal, but maybe easier to fix for the release. > >>>>> >> > >>>>> >> > The key concept here is not "standard coder" but "coder that the > >>>>> >> > runner does not understand." This knowledge is only in the > runner. > >>>>> >> > Also has the downside of (2). > >>>>> >> > >>>>> >> Yes, I had assumed "non-standard" and "unknown" are the same, but > the > >>>>> >> latter can be a subset of the former, i.e. if a Runner does not > support > >>>>> >> all of the standard coders for some reason. > >>>>> >> > >>>>> >> > This means that the wire format that the runner sends for the > "key" represents the exact same wire format it will receive for state > requests. > >>>>> >> > >>>>> >> The wire format for the entire element is the same. Otherwise we > >>>>> >> wouldn't be able to process data between the Runner and the SDK > Harness. > >>>>> >> However, the problem is that the way the Runner instantiates the > key > >>>>> >> coder to partition elements, does not match how the SDK encodes > the key > >>>>> >> when it sends a state request to the Runner. Conceptually, those > two > >>>>> >> situations should be the same, but in practice they are not. > >>>>> >> > >>>>> >> > >>>>> >> Now that I thought about it again option (1) is probably the most > >>>>> >> explicit and in that sense cleanest. However, option (3) is kind > of fair > >>>>> >> because it would just replicate the implicit LengthPrefixCoder > behavior > >>>>> >> we have for general wire transfer also for state requests. Option > (2) I > >>>>> >> suppose is the most implicit and runner-specific, should probably > be > >>>>> >> avoided in the long run. > >>>>> >> > >>>>> >> So I'd probably opt for (1) and I would update the PR[1] rather > soon > >>>>> >> because this currently blocks the release, as this is a > regression from > >>>>> >> 2.16.0.[2] > >>>>> >> > >>>>> >> > >>>>> >> -Max > >>>>> >> > >>>>> >> [1] https://github.com/apache/beam/pull/9997 > >>>>> >> [2] (In 2.16.0 it worked for Python because the Runner used a > >>>>> >> ByteArrayCoder with the OUTER encoding context for the key which > was > >>>>> >> basically option (2). Only problem that, for standard coders the > Java > >>>>> >> SDK Harness produced non-matching state request keys, due to it > using > >>>>> >> the NESTED context.) > >>>>> >> > >>>>> >> On 07.11.19 18:01, Luke Cwik wrote: > >>>>> >> > > >>>>> >> > > >>>>> >> > On Thu, Nov 7, 2019 at 8:22 AM Robert Bradshaw < > rober...@google.com > >>>>> >> > <mailto:rober...@google.com>> wrote: > >>>>> >> > > >>>>> >> > On Thu, Nov 7, 2019 at 6:26 AM Maximilian Michels < > m...@apache.org > >>>>> >> > <mailto:m...@apache.org>> wrote: > >>>>> >> > > > >>>>> >> > > Thanks for the feedback thus far. Some more comments: > >>>>> >> > > > >>>>> >> > > > Instead, the runner knows ahead of time that it > >>>>> >> > > > will need to instantiate this coder, and should update > the bundle > >>>>> >> > > > processor to specify > KvCoder<LengthPrefixCoder<CustomCoder>, > >>>>> >> > > > VarIntCoder> as the coder so both can pull it out in a > >>>>> >> > consistent way. > >>>>> >> > > > >>>>> >> > > By "update the bundle processor", do you mean modifying > the > >>>>> >> > > ProcessBundleDescriptor's BagUserState with the correct > key coder? > >>>>> >> > > Conceptually that is possible, but the current > implementation > >>>>> >> > does not > >>>>> >> > > allow for this to happen: > >>>>> >> > > > >>>>> >> > > https://github.com/apache/beam/blob/076a037e53ca39b61c1dfeb580527bc8d0371dc1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java#L284 > >>>>> >> > > It enforces ByteString which does not tell the SDK > Harness anything > >>>>> >> > > about the desired encoding. > >>>>> >> > > >>>>> >> > I meant update the BundleProcessDescriptor proto that is > sent to the > >>>>> >> > SDK > >>>>> >> > > https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L140 > , > >>>>> >> > essentially option (1). > >>>>> >> > > >>>>> >> > > >>>>> >> > For clarity, the "key" coder is specified by the stateful > ParDo's main > >>>>> >> > input PCollection. This means that the ProcessBundleDescriptor > should > >>>>> >> > have something that has the length prefix as part of the remote > grpc > >>>>> >> > port specification AND the PCollection that follows it which is > the main > >>>>> >> > input for the stateful ParDo. This means that the wire format > that the > >>>>> >> > runner sends for the "key" represents the exact same wire > format it will > >>>>> >> > receive for state requests. > >>>>> >> > > >>>>> >> > I see what you mean Max, > >>>>> >> > > https://github.com/apache/beam/blob/076a037e53ca39b61c1dfeb580527bc8d0371dc1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java#L284 > >>>>> >> > could change to represent the actual coder (whether it be > >>>>> >> > LengthPrefixCoder<BytesCoder> or some other model coder > combination). > >>>>> >> > Currently that logic assumes that the runner can perform the > backwards > >>>>> >> > mapping by decoding the bytestring with the appropriate coder. > >>>>> >> > > >>>>> >> > > Since the above does not seem feasible, I see the > following options: > >>>>> >> > > > >>>>> >> > > (1) Modify the pipeline Proto before the translation and > wrap a > >>>>> >> > > LengthPrefixCoder around non-standard key coders for > stateful > >>>>> >> > > transforms. This would change the encoding for the entire > >>>>> >> > element, to be > >>>>> >> > > sure that the key coder for state requests contains a > >>>>> >> > LengthPrefixCoder > >>>>> >> > > for state requests from the SDK Harness. Not optimal. > >>>>> >> > > >>>>> >> > Yes. The contract should be that both the runner and SDK > use the > >>>>> >> > coders that are specified in the proto. The runner controls > the proto, > >>>>> >> > and should ensure it only sends protos it will be able to > handle the > >>>>> >> > SDK responding to. I'm not seeing why this is sub-optimal. > >>>>> >> > > >>>>> >> > > (2) Add a new method > WireCoders#instantiateRunnerWireKeyCoder which > >>>>> >> > > returns the correct key coder, i.e. for standard coders, > the concrete > >>>>> >> > > coder, and for non-standard coders a ByteArrayCoder. We > also need to > >>>>> >> > > ensure the key encoding on the Runner side is OUTER > context, to avoid > >>>>> >> > > adding a length prefix to the encoded bytes. Basically, > the > >>>>> >> > non-standard > >>>>> >> > > coders result in a NOOP coder which does not touch the > key bytes. > >>>>> >> > > >>>>> >> > I'd really like to avoid implicit agreements about how the > coder that > >>>>> >> > should be used differs from what's specified in the proto > in different > >>>>> >> > contexts. > >>>>> >> > > >>>>> >> > > (3) Patch the Python SDK to ensure non-standard state > key coders are > >>>>> >> > > always wrapped in a LengthPrefixCoder. That way, we can > keep the > >>>>> >> > > existing logic on the Runner side. > >>>>> >> > > >>>>> >> > The key concept here is not "standard coder" but "coder > that the > >>>>> >> > runner does not understand." This knowledge is only in the > runner. > >>>>> >> > Also has the downside of (2). > >>>>> >> > > >>>>> >> > > Option (2) seems like the most practical. > >>>>> >> > > > >>>>> >> > > -Max > >>>>> >> > > > >>>>> >> > > On 06.11.19 17:26, Robert Bradshaw wrote: > >>>>> >> > > > On Wed, Nov 6, 2019 at 2:55 AM Maximilian Michels > >>>>> >> > <m...@apache.org <mailto:m...@apache.org>> wrote: > >>>>> >> > > >> > >>>>> >> > > >> Let me try to clarify: > >>>>> >> > > >> > >>>>> >> > > >>> The Coder used for State/Timers in a StatefulDoFn is > pulled > >>>>> >> > out of the > >>>>> >> > > >>> input PCollection. If a Runner needs to partition by > this > >>>>> >> > coder, it > >>>>> >> > > >>> should ensure the coder of this PCollection matches > with the > >>>>> >> > Coder > >>>>> >> > > >>> used to create the serialized bytes that are used for > >>>>> >> > partitioning > >>>>> >> > > >>> (whether or not this is length-prefixed). > >>>>> >> > > >> > >>>>> >> > > >> That is essentially what I had assumed when I wrote > the code. The > >>>>> >> > > >> problem is the coder can be "pulled out" in different > ways. > >>>>> >> > > >> > >>>>> >> > > >> For example, let's say we have the following Proto > PCollection > >>>>> >> > coder > >>>>> >> > > >> with non-standard coder "CustomCoder" as the key > coder: > >>>>> >> > > >> > >>>>> >> > > >> KvCoder<CustomCoder, VarIntCoder> > >>>>> >> > > >> > >>>>> >> > > >> From the Runner side, this currently looks like the > following: > >>>>> >> > > >> > >>>>> >> > > >> PCol: KvCoder<LengthPrefixCoder<ByteArrayCoder>, > VarIntCoder> > >>>>> >> > > >> Key: LengthPrefixCoder<ByteArrayCoder> > >>>>> >> > > > > >>>>> >> > > > This is I think where the error is. When If the proto > references > >>>>> >> > > > KvCoder<CustomCoder, VarIntCoder> it should not be > pulled out as > >>>>> >> > > > KvCoder<LengthPrefixCoder<ByteArrayCoder>, > VarIntCoder>; as that > >>>>> >> > > > doesn't have the same encoding. Trying to do > instantiate such a > >>>>> >> > coder > >>>>> >> > > > should be an error. Instead, the runner knows ahead of > time that it > >>>>> >> > > > will need to instantiate this coder, and should update > the bundle > >>>>> >> > > > processor to specify > KvCoder<LengthPrefixCoder<CustomCoder>, > >>>>> >> > > > VarIntCoder> as the coder so both can pull it out in a > >>>>> >> > consistent way. > >>>>> >> > > > > >>>>> >> > > > When the coder is > KvCoder<LengthPrefixCoder<CustomCoder>, > >>>>> >> > VarIntCoder> > >>>>> >> > > > instantiating it as KvCoder<ByteArrayCoder, > VarIntCoder> on the > >>>>> >> > runner > >>>>> >> > > > is of course OK as they do have the same encoding. > >>>>> >> > > > > >>>>> >> > > >> At the SDK Harness, we have the coder available: > >>>>> >> > > >> > >>>>> >> > > >> PCol: KvCoder<CustomCoder, VarIntCoder> > >>>>> >> > > >> Key: CustomCoder > >>>>> >> > > >> > >>>>> >> > > >> Currently, when the SDK Harness serializes a key for > a state > >>>>> >> > request, > >>>>> >> > > >> the custom coder may happen to add a length prefix, > or it may > >>>>> >> > not. It > >>>>> >> > > >> depends on the coder used. The correct behavior would > be to > >>>>> >> > use the same > >>>>> >> > > >> representation as on the Runner side. > >>>>> >> > > >> > >>>>> >> > > >>> Specifically, "We have no way of telling from the > Runner > >>>>> >> > side, if a length prefix has been used or not." seems false > >>>>> >> > > >> > >>>>> >> > > >> The Runner cannot inspect an unknown coder, it only > has the > >>>>> >> > opaque Proto > >>>>> >> > > >> information available which does not allow > introspection of > >>>>> >> > non-standard > >>>>> >> > > >> coders. With the current state, the Runner may think > the coder > >>>>> >> > adds a > >>>>> >> > > >> length prefix but the Python SDK worker could choose > to add > >>>>> >> > none. This > >>>>> >> > > >> produces an inconsistent key encoding. See above. > >>>>> >> > > > > >>>>> >> > > > I think what's being conflated here is "the Coder has > been > >>>>> >> > wrapped in > >>>>> >> > > > a LengthPrefixCoder" vs. "the coder does length > prefixing." > >>>>> >> > These are > >>>>> >> > > > two orthogonal concepts. The runner in general only > knows the > >>>>> >> > former. > >>>>> >> > > > > >>>>> >> > > >> It looks like the key encoding for state requests on > the > >>>>> >> > Python SDK > >>>>> >> > > >> Harness side is broken. For transferring elements of a > >>>>> >> > PCollection, the > >>>>> >> > > >> coders are obviously working correctly, but for > encoding > >>>>> >> > solely the key > >>>>> >> > > >> of an element, there is a consistency issue. > >>>>> >> > > >> > >>>>> >> > > >> > >>>>> >> > > >> -Max > >>>>> >> > > >> > >>>>> >> > > >> On 06.11.19 05:35, Kenneth Knowles wrote: > >>>>> >> > > >>> Specifically, "We have no way of telling from the > Runner > >>>>> >> > side, if a > >>>>> >> > > >>> length prefix has been used or not." seems false. > The runner > >>>>> >> > has all the > >>>>> >> > > >>> information since length prefix is a model coder. > Didn't we > >>>>> >> > agree that > >>>>> >> > > >>> all coders should be self-delimiting in runner/SDK > interactions, > >>>>> >> > > >>> requiring length-prefix only when there is an opaque > or > >>>>> >> > dynamic-length > >>>>> >> > > >>> value? I assume you mean that at runtime the worker > for a > >>>>> >> > given engine > >>>>> >> > > >>> does not know? > >>>>> >> > > >>> > >>>>> >> > > >>> Kenn > >>>>> >> > > >>> > >>>>> >> > > >>> On Tue, Nov 5, 2019 at 3:19 PM Luke Cwik < > lc...@google.com > >>>>> >> > <mailto:lc...@google.com> > >>>>> >> > > >>> <mailto:lc...@google.com <mailto:lc...@google.com>>> > wrote: > >>>>> >> > > >>> > >>>>> >> > > >>> +1 to what Robert said. > >>>>> >> > > >>> > >>>>> >> > > >>> On Tue, Nov 5, 2019 at 2:36 PM Robert Bradshaw > >>>>> >> > <rober...@google.com <mailto:rober...@google.com> > >>>>> >> > > >>> <mailto:rober...@google.com > >>>>> >> > <mailto:rober...@google.com>>> wrote: > >>>>> >> > > >>> > >>>>> >> > > >>> The Coder used for State/Timers in a > StatefulDoFn is > >>>>> >> > pulled out > >>>>> >> > > >>> of the > >>>>> >> > > >>> input PCollection. If a Runner needs to > partition by > >>>>> >> > this coder, it > >>>>> >> > > >>> should ensure the coder of this PCollection > matches > >>>>> >> > with the Coder > >>>>> >> > > >>> used to create the serialized bytes that > are used > >>>>> >> > for partitioning > >>>>> >> > > >>> (whether or not this is length-prefixed). > >>>>> >> > > >>> > >>>>> >> > > >>> Concretely, the graph looks like > >>>>> >> > > >>> > >>>>> >> > > >>> > >>>>> >> > > >>> Runner SDK Harness > >>>>> >> > > >>> > >>>>> >> > > >>> WriteToGbk > >>>>> >> > > >>> | > >>>>> >> > > >>> ReadFromGbk > >>>>> >> > > >>> | > >>>>> >> > > >>> RunnerMapFn.processKeyValue(key, value) > >>>>> >> > > >>> | > >>>>> >> > > >>> WriteToDataChannel > >>>>> >> > > >>> ------------------------> > >>>>> >> > > >>> ReadFromDataChannel > >>>>> >> > > >>> | > >>>>> >> > > >>> (pcIn) > >>>>> >> > > >>> | > >>>>> >> > > >>> > MyStatefulDoFn.process(key, value) > >>>>> >> > > >>> > >>>>> >> > > >>> Now the (key part of the) Coder of pcIn, > which comes > >>>>> >> > from the proto > >>>>> >> > > >>> that the Runner sent to the SDK, must match > the (key > >>>>> >> > part of the) > >>>>> >> > > >>> encoding used in WriteToGbk and > ReadFromGbk. If a > >>>>> >> > LenthPrefix is > >>>>> >> > > >>> added > >>>>> >> > > >>> in one spot, it must be added in the other. > >>>>> >> > > >>> > >>>>> >> > > >>> > >>>>> >> > > >>> [1] > >>>>> >> > > >>> > >>>>> >> > > https://github.com/apache/beam/blob/release-2.17.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L1183 > >>>>> >> > > >>> > >>>>> >> > > >>> On Tue, Nov 5, 2019 at 1:25 PM Maximilian > Michels > >>>>> >> > > >>> <m...@apache.org <mailto:m...@apache.org> > >>>>> >> > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote: > >>>>> >> > > >>> > > >>>>> >> > > >>> > Hi, > >>>>> >> > > >>> > > >>>>> >> > > >>> > I wanted to get your opinion on > something that I > >>>>> >> > have been > >>>>> >> > > >>> struggling > >>>>> >> > > >>> > with. It is about the coders for state > requests > >>>>> >> > in portable > >>>>> >> > > >>> pipelines. > >>>>> >> > > >>> > > >>>>> >> > > >>> > In contrast to "classic" Beam, the > Runner is not > >>>>> >> > guaranteed > >>>>> >> > > >>> to know > >>>>> >> > > >>> > which coder is used by the SDK. If the > SDK > >>>>> >> > happens to use a > >>>>> >> > > >>> standard > >>>>> >> > > >>> > coder (also known as model coder), we > will also > >>>>> >> > have it > >>>>> >> > > >>> available at the > >>>>> >> > > >>> > Runner, i.e. if the Runner is written in > one of > >>>>> >> > the SDK > >>>>> >> > > >>> languages (e.g. > >>>>> >> > > >>> > Java). However, when we do not have a > standard > >>>>> >> > coder, we just > >>>>> >> > > >>> treat the > >>>>> >> > > >>> > data from the SDK as a blob and just > pass it > >>>>> >> > around as bytes. > >>>>> >> > > >>> > > >>>>> >> > > >>> > Problem > >>>>> >> > > >>> > ======= > >>>>> >> > > >>> > > >>>>> >> > > >>> > In the case of state requests which the > SDK > >>>>> >> > Harness authors > >>>>> >> > > >>> to the > >>>>> >> > > >>> > Runner, we would like for the key > associated with > >>>>> >> > the state > >>>>> >> > > >>> request to > >>>>> >> > > >>> > match the key of the element which led to > >>>>> >> > initiating the > >>>>> >> > > >>> state request. > >>>>> >> > > >>> > > >>>>> >> > > >>> > Example: > >>>>> >> > > >>> > > >>>>> >> > > >>> > Runner SDK Harness > >>>>> >> > > >>> > ------ ----------- > >>>>> >> > > >>> > > >>>>> >> > > >>> > KV["key","value"] --> Process Element > >>>>> >> > > >>> > | > >>>>> >> > > >>> > LookupState("key") <-- Request state of > "key" > >>>>> >> > > >>> > | > >>>>> >> > > >>> > State["key"] --> Receive state > >>>>> >> > > >>> > > >>>>> >> > > >>> > > >>>>> >> > > >>> > For stateful DoFns, the Runner > partitions the > >>>>> >> > data based on > >>>>> >> > > >>> the key. In > >>>>> >> > > >>> > Flink, this partitioning must not change > during > >>>>> >> > the lifetime of a > >>>>> >> > > >>> > pipeline because the checkpointing > otherwise > >>>>> >> > breaks[0]. The > >>>>> >> > > >>> key is > >>>>> >> > > >>> > extracted from the element and stored > encoded. > >>>>> >> > > >>> > > >>>>> >> > > >>> > If we have a standard coder, it is > basically the > >>>>> >> > same as in the > >>>>> >> > > >>> > "classic" Runner which takes the key and > >>>>> >> > serializes it. > >>>>> >> > > >>> However, when we > >>>>> >> > > >>> > have an SDK-specific coder, we basically > do not > >>>>> >> > know how it > >>>>> >> > > >>> encodes. So > >>>>> >> > > >>> > far, we have been using the coder > instantiated > >>>>> >> > from the > >>>>> >> > > >>> Proto, which is > >>>>> >> > > >>> > basically a > LengthPrefixCoder[ByteArrayCoder] or > >>>>> >> > similar[1]. > >>>>> >> > > >>> We have had > >>>>> >> > > >>> > problems with this because the key > encoding of > >>>>> >> > Java SDK state > >>>>> >> > > >>> requests > >>>>> >> > > >>> > did not match the key encoding on the > Runner side > >>>>> >> > [2]. In an > >>>>> >> > > >>> attempt to > >>>>> >> > > >>> > fix those, it is now partly broken for > portable > >>>>> >> > Python pipelines. > >>>>> >> > > >>> > Partly, because it "only" affects > non-standard > >>>>> >> > coders. > >>>>> >> > > >>> > > >>>>> >> > > >>> > Non-standard coders yield the > aforementioned > >>>>> >> > > >>> > LengthPrefixCoder[ByteArrayCoder]. Now, > following > >>>>> >> > the usual > >>>>> >> > > >>> encoding > >>>>> >> > > >>> > scheme, we would simply encode the key > using this > >>>>> >> > coder. > >>>>> >> > > >>> However, for > >>>>> >> > > >>> > state requests, the Python SDK leaves > out the > >>>>> >> > length prefix > >>>>> >> > > >>> for certain > >>>>> >> > > >>> > coders, e.g. for primitives like int or > byte. It > >>>>> >> > is possible > >>>>> >> > > >>> that one > >>>>> >> > > >>> > coder uses a length prefix, while another > >>>>> >> > doesn't. We have no > >>>>> >> > > >>> way of > >>>>> >> > > >>> > telling from the Runner side, if a > length prefix > >>>>> >> > has been > >>>>> >> > > >>> used or not. > >>>>> >> > > >>> > This results in the keys to not match on > the > >>>>> >> > Runner side and the > >>>>> >> > > >>> > partitioning to be broken. > >>>>> >> > > >>> > > >>>>> >> > > >>> > > >>>>> >> > > >>> > How to solve this? > >>>>> >> > > >>> > ================== > >>>>> >> > > >>> > > >>>>> >> > > >>> > (1) Should this simply be fixed on the > Python SDK > >>>>> >> > side? One > >>>>> >> > > >>> fix would be > >>>>> >> > > >>> > to always append a length prefix to the > key in state > >>>>> >> > > >>> requests, even for > >>>>> >> > > >>> > primitive coders like VarInt which do > not use one. > >>>>> >> > > >>> > > >>>>> >> > > >>> > OR > >>>>> >> > > >>> > > >>>>> >> > > >>> > (2) Should the Runner detect that a > non-standard > >>>>> >> > coder is > >>>>> >> > > >>> used? If so, > >>>>> >> > > >>> > it should just pass the bytes from the > SDK > >>>>> >> > Harness and never > >>>>> >> > > >>> make an > >>>>> >> > > >>> > attempt to construct a coder based on > the Proto. > >>>>> >> > > >>> > > >>>>> >> > > >>> > > >>>>> >> > > >>> > Thinking about it now, it seems pretty > obvious > >>>>> >> > that (2) is > >>>>> >> > > >>> the most > >>>>> >> > > >>> > feasible way to avoid complications > across all > >>>>> >> > current and > >>>>> >> > > >>> future SDKs > >>>>> >> > > >>> > for key encodings. Still, it is odd that > the > >>>>> >> > Proto contains coder > >>>>> >> > > >>> > information which is not usable. > >>>>> >> > > >>> > > >>>>> >> > > >>> > What do you think? > >>>>> >> > > >>> > > >>>>> >> > > >>> > > >>>>> >> > > >>> > Thanks, > >>>>> >> > > >>> > Max > >>>>> >> > > >>> > > >>>>> >> > > >>> > > >>>>> >> > > >>> > [0] It is possible to restart the > pipeline and > >>>>> >> > repartition the > >>>>> >> > > >>> > checkpointed data. > >>>>> >> > > >>> > [1] > >>>>> >> > > >>> > > >>>>> >> > > >>> > >>>>> >> > > https://github.com/apache/beam/blob/c39752af5391fe698a2b4f1489c187ddd4d604c0/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L682 > >>>>> >> > > >>> > [2] > https://issues.apache.org/jira/browse/BEAM-8157 > >>>>> >> > > >>> > >>>>> >> > >