And by "I wasn't clear" I meant "I misread the options".
On Fri, Nov 8, 2019, 4:14 PM Robert Burke <[email protected]> wrote: > Reading back, I wasn't clear: the Go SDK does Option (1), putting the LP > explicitly during encoding [1] for the runner proto, and explicitly expects > LPs to contain a custom coder URN on decode for execution [2]. (Modulo an > old bug in Dataflow where the urn was empty) > > > [1] > https://github.com/apache/beam/blob/4364a214dfe6d8d5dd84b1bb91d579f466492ca5/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L348 > [2] > https://github.com/apache/beam/blob/4364a214dfe6d8d5dd84b1bb91d579f466492ca5/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L219 > > > On Fri, Nov 8, 2019, 10:28 AM Robert Bradshaw <[email protected]> wrote: > >> On Fri, Nov 8, 2019 at 2:09 AM jincheng sun <[email protected]> >> wrote: >> > >> > Hi, >> > >> > Sorry for my late reply. It seems the conclusion has been reached. I >> just want to share my personal thoughts. >> > >> > Generally, both option 1 and 3 make sense to me. >> > >> > >> The key concept here is not "standard coder" but "coder that the >> > >> runner does not understand." This knowledge is only in the runner. >> > >> Also has the downside of (2). >> > >> > >Yes, I had assumed "non-standard" and "unknown" are the same, but the >> > >latter can be a subset of the former, i.e. if a Runner does not support >> > >all of the standard coders for some reason. >> > >> > I'm also assume that "non-standard" and "unknown" are the same. >> Currently, in the runner side[1] it >> > decides whether the coder is unknown(wrap with length prefix coder) >> according to whether the coder is among >> > the standard coders. It will not communicate with harness to make this >> decision. >> > >> > So, from my point of view, we can update the PR according to option 1 >> or 3. >> > >> > [1] >> https://github.com/apache/beam/blob/66a67e6580b93906038b31ae7070204cec90999c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L62 >> >> That list is populated in Java code [1] and has typically been a >> subset of what is in the proto file. Things like StringUtf8Coder and >> DoubleCoder have been added at different times to different SDKs and >> Runners, sometimes long after the URN is in the proto. Having to keep >> this list synchronized (and versioned) would be a regression. >> >> [1] >> https://github.com/apache/beam/blob/release-2.17.0/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java >> >> The PR taking approach (1) looks good at a first glance (I see others >> are reviewing it). Thanks. >> >> > Maximilian Michels <[email protected]> 于2019年11月8日周五 上午3:35写道: >> >> >> >> > While the Go SDK doesn't yet support a State API, Option 3) is what >> the Go SDK does for all non-standard coders (aka custom coders) anyway. >> >> >> >> For wire transfer, the Java Runner also adds a LengthPrefixCoder for >> the >> >> coder and its subcomponents. The problem is that this is an implicit >> >> assumption made. In the Proto, we do not have this represented. This is >> >> why **for state requests**, we end up with a >> >> "LengthPrefixCoder[CustomCoder]" on the Runner and a "CustomCoder" on >> >> the SDK Harness side. Note that the Python Harness does wrap unknown >> >> coders in a LengthPrefixCoder for transferring regular elements, but >> the >> >> LengthPrefixCoder is not preserved for the state requests. >> >> >> >> In that sense (3) is good because it follows this implicit notion of >> >> adding a LengthPrefixCoder for wire transfer, but applies it to state >> >> requests. >> >> >> >> However, option (1) is most reliable because the LengthPrefixCoder is >> >> actually in the Proto. So "CustomCoder" will always be represented as >> >> "LengthPrefixCoder[CustomCoder]", and only standard coders will be >> added >> >> without a LengthPrefixCoder. >> >> >> >> > I'd really like to avoid implicit agreements about how the coder that >> >> > should be used differs from what's specified in the proto in >> different >> >> > contexts. >> >> >> >> Option (2) would work on top of the existing logic because replacing a >> >> non-standard coder with a "NOOP coder" would just be used by the Runner >> >> to produce a serialized version of the key for partitioning. Flink >> >> always operates on the serialized key, be it standard or non-standard >> >> coder. It wouldn't be necessary to change any of the existing wire >> >> transfer logic or representation. I understand that it would be less >> >> ideal, but maybe easier to fix for the release. >> >> >> >> > The key concept here is not "standard coder" but "coder that the >> >> > runner does not understand." This knowledge is only in the runner. >> >> > Also has the downside of (2). >> >> >> >> Yes, I had assumed "non-standard" and "unknown" are the same, but the >> >> latter can be a subset of the former, i.e. if a Runner does not support >> >> all of the standard coders for some reason. >> >> >> >> > This means that the wire format that the runner sends for the "key" >> represents the exact same wire format it will receive for state requests. >> >> >> >> The wire format for the entire element is the same. Otherwise we >> >> wouldn't be able to process data between the Runner and the SDK >> Harness. >> >> However, the problem is that the way the Runner instantiates the key >> >> coder to partition elements, does not match how the SDK encodes the key >> >> when it sends a state request to the Runner. Conceptually, those two >> >> situations should be the same, but in practice they are not. >> >> >> >> >> >> Now that I thought about it again option (1) is probably the most >> >> explicit and in that sense cleanest. However, option (3) is kind of >> fair >> >> because it would just replicate the implicit LengthPrefixCoder behavior >> >> we have for general wire transfer also for state requests. Option (2) I >> >> suppose is the most implicit and runner-specific, should probably be >> >> avoided in the long run. >> >> >> >> So I'd probably opt for (1) and I would update the PR[1] rather soon >> >> because this currently blocks the release, as this is a regression from >> >> 2.16.0.[2] >> >> >> >> >> >> -Max >> >> >> >> [1] https://github.com/apache/beam/pull/9997 >> >> [2] (In 2.16.0 it worked for Python because the Runner used a >> >> ByteArrayCoder with the OUTER encoding context for the key which was >> >> basically option (2). Only problem that, for standard coders the Java >> >> SDK Harness produced non-matching state request keys, due to it using >> >> the NESTED context.) >> >> >> >> On 07.11.19 18:01, Luke Cwik wrote: >> >> > >> >> > >> >> > On Thu, Nov 7, 2019 at 8:22 AM Robert Bradshaw <[email protected] >> >> > <mailto:[email protected]>> wrote: >> >> > >> >> > On Thu, Nov 7, 2019 at 6:26 AM Maximilian Michels < >> [email protected] >> >> > <mailto:[email protected]>> wrote: >> >> > > >> >> > > Thanks for the feedback thus far. Some more comments: >> >> > > >> >> > > > Instead, the runner knows ahead of time that it >> >> > > > will need to instantiate this coder, and should update the >> bundle >> >> > > > processor to specify KvCoder<LengthPrefixCoder<CustomCoder>, >> >> > > > VarIntCoder> as the coder so both can pull it out in a >> >> > consistent way. >> >> > > >> >> > > By "update the bundle processor", do you mean modifying the >> >> > > ProcessBundleDescriptor's BagUserState with the correct key >> coder? >> >> > > Conceptually that is possible, but the current implementation >> >> > does not >> >> > > allow for this to happen: >> >> > > >> >> > >> https://github.com/apache/beam/blob/076a037e53ca39b61c1dfeb580527bc8d0371dc1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java#L284 >> >> > > It enforces ByteString which does not tell the SDK Harness >> anything >> >> > > about the desired encoding. >> >> > >> >> > I meant update the BundleProcessDescriptor proto that is sent to >> the >> >> > SDK >> >> > >> https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L140 >> , >> >> > essentially option (1). >> >> > >> >> > >> >> > For clarity, the "key" coder is specified by the stateful ParDo's >> main >> >> > input PCollection. This means that the ProcessBundleDescriptor should >> >> > have something that has the length prefix as part of the remote grpc >> >> > port specification AND the PCollection that follows it which is the >> main >> >> > input for the stateful ParDo. This means that the wire format that >> the >> >> > runner sends for the "key" represents the exact same wire format it >> will >> >> > receive for state requests. >> >> > >> >> > I see what you mean Max, >> >> > >> https://github.com/apache/beam/blob/076a037e53ca39b61c1dfeb580527bc8d0371dc1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java#L284 >> >> > could change to represent the actual coder (whether it be >> >> > LengthPrefixCoder<BytesCoder> or some other model coder combination). >> >> > Currently that logic assumes that the runner can perform the >> backwards >> >> > mapping by decoding the bytestring with the appropriate coder. >> >> > >> >> > > Since the above does not seem feasible, I see the following >> options: >> >> > > >> >> > > (1) Modify the pipeline Proto before the translation and wrap >> a >> >> > > LengthPrefixCoder around non-standard key coders for stateful >> >> > > transforms. This would change the encoding for the entire >> >> > element, to be >> >> > > sure that the key coder for state requests contains a >> >> > LengthPrefixCoder >> >> > > for state requests from the SDK Harness. Not optimal. >> >> > >> >> > Yes. The contract should be that both the runner and SDK use the >> >> > coders that are specified in the proto. The runner controls the >> proto, >> >> > and should ensure it only sends protos it will be able to handle >> the >> >> > SDK responding to. I'm not seeing why this is sub-optimal. >> >> > >> >> > > (2) Add a new method WireCoders#instantiateRunnerWireKeyCoder >> which >> >> > > returns the correct key coder, i.e. for standard coders, the >> concrete >> >> > > coder, and for non-standard coders a ByteArrayCoder. We also >> need to >> >> > > ensure the key encoding on the Runner side is OUTER context, >> to avoid >> >> > > adding a length prefix to the encoded bytes. Basically, the >> >> > non-standard >> >> > > coders result in a NOOP coder which does not touch the key >> bytes. >> >> > >> >> > I'd really like to avoid implicit agreements about how the coder >> that >> >> > should be used differs from what's specified in the proto in >> different >> >> > contexts. >> >> > >> >> > > (3) Patch the Python SDK to ensure non-standard state key >> coders are >> >> > > always wrapped in a LengthPrefixCoder. That way, we can keep >> the >> >> > > existing logic on the Runner side. >> >> > >> >> > The key concept here is not "standard coder" but "coder that the >> >> > runner does not understand." This knowledge is only in the >> runner. >> >> > Also has the downside of (2). >> >> > >> >> > > Option (2) seems like the most practical. >> >> > > >> >> > > -Max >> >> > > >> >> > > On 06.11.19 17:26, Robert Bradshaw wrote: >> >> > > > On Wed, Nov 6, 2019 at 2:55 AM Maximilian Michels >> >> > <[email protected] <mailto:[email protected]>> wrote: >> >> > > >> >> >> > > >> Let me try to clarify: >> >> > > >> >> >> > > >>> The Coder used for State/Timers in a StatefulDoFn is >> pulled >> >> > out of the >> >> > > >>> input PCollection. If a Runner needs to partition by this >> >> > coder, it >> >> > > >>> should ensure the coder of this PCollection matches with >> the >> >> > Coder >> >> > > >>> used to create the serialized bytes that are used for >> >> > partitioning >> >> > > >>> (whether or not this is length-prefixed). >> >> > > >> >> >> > > >> That is essentially what I had assumed when I wrote the >> code. The >> >> > > >> problem is the coder can be "pulled out" in different ways. >> >> > > >> >> >> > > >> For example, let's say we have the following Proto >> PCollection >> >> > coder >> >> > > >> with non-standard coder "CustomCoder" as the key coder: >> >> > > >> >> >> > > >> KvCoder<CustomCoder, VarIntCoder> >> >> > > >> >> >> > > >> From the Runner side, this currently looks like the >> following: >> >> > > >> >> >> > > >> PCol: KvCoder<LengthPrefixCoder<ByteArrayCoder>, >> VarIntCoder> >> >> > > >> Key: LengthPrefixCoder<ByteArrayCoder> >> >> > > > >> >> > > > This is I think where the error is. When If the proto >> references >> >> > > > KvCoder<CustomCoder, VarIntCoder> it should not be pulled >> out as >> >> > > > KvCoder<LengthPrefixCoder<ByteArrayCoder>, VarIntCoder>; as >> that >> >> > > > doesn't have the same encoding. Trying to do instantiate >> such a >> >> > coder >> >> > > > should be an error. Instead, the runner knows ahead of time >> that it >> >> > > > will need to instantiate this coder, and should update the >> bundle >> >> > > > processor to specify KvCoder<LengthPrefixCoder<CustomCoder>, >> >> > > > VarIntCoder> as the coder so both can pull it out in a >> >> > consistent way. >> >> > > > >> >> > > > When the coder is KvCoder<LengthPrefixCoder<CustomCoder>, >> >> > VarIntCoder> >> >> > > > instantiating it as KvCoder<ByteArrayCoder, VarIntCoder> on >> the >> >> > runner >> >> > > > is of course OK as they do have the same encoding. >> >> > > > >> >> > > >> At the SDK Harness, we have the coder available: >> >> > > >> >> >> > > >> PCol: KvCoder<CustomCoder, VarIntCoder> >> >> > > >> Key: CustomCoder >> >> > > >> >> >> > > >> Currently, when the SDK Harness serializes a key for a >> state >> >> > request, >> >> > > >> the custom coder may happen to add a length prefix, or it >> may >> >> > not. It >> >> > > >> depends on the coder used. The correct behavior would be to >> >> > use the same >> >> > > >> representation as on the Runner side. >> >> > > >> >> >> > > >>> Specifically, "We have no way of telling from the Runner >> >> > side, if a length prefix has been used or not." seems false >> >> > > >> >> >> > > >> The Runner cannot inspect an unknown coder, it only has the >> >> > opaque Proto >> >> > > >> information available which does not allow introspection of >> >> > non-standard >> >> > > >> coders. With the current state, the Runner may think the >> coder >> >> > adds a >> >> > > >> length prefix but the Python SDK worker could choose to add >> >> > none. This >> >> > > >> produces an inconsistent key encoding. See above. >> >> > > > >> >> > > > I think what's being conflated here is "the Coder has been >> >> > wrapped in >> >> > > > a LengthPrefixCoder" vs. "the coder does length prefixing." >> >> > These are >> >> > > > two orthogonal concepts. The runner in general only knows >> the >> >> > former. >> >> > > > >> >> > > >> It looks like the key encoding for state requests on the >> >> > Python SDK >> >> > > >> Harness side is broken. For transferring elements of a >> >> > PCollection, the >> >> > > >> coders are obviously working correctly, but for encoding >> >> > solely the key >> >> > > >> of an element, there is a consistency issue. >> >> > > >> >> >> > > >> >> >> > > >> -Max >> >> > > >> >> >> > > >> On 06.11.19 05:35, Kenneth Knowles wrote: >> >> > > >>> Specifically, "We have no way of telling from the Runner >> >> > side, if a >> >> > > >>> length prefix has been used or not." seems false. The >> runner >> >> > has all the >> >> > > >>> information since length prefix is a model coder. Didn't >> we >> >> > agree that >> >> > > >>> all coders should be self-delimiting in runner/SDK >> interactions, >> >> > > >>> requiring length-prefix only when there is an opaque or >> >> > dynamic-length >> >> > > >>> value? I assume you mean that at runtime the worker for a >> >> > given engine >> >> > > >>> does not know? >> >> > > >>> >> >> > > >>> Kenn >> >> > > >>> >> >> > > >>> On Tue, Nov 5, 2019 at 3:19 PM Luke Cwik < >> [email protected] >> >> > <mailto:[email protected]> >> >> > > >>> <mailto:[email protected] <mailto:[email protected]>>> >> wrote: >> >> > > >>> >> >> > > >>> +1 to what Robert said. >> >> > > >>> >> >> > > >>> On Tue, Nov 5, 2019 at 2:36 PM Robert Bradshaw >> >> > <[email protected] <mailto:[email protected]> >> >> > > >>> <mailto:[email protected] >> >> > <mailto:[email protected]>>> wrote: >> >> > > >>> >> >> > > >>> The Coder used for State/Timers in a >> StatefulDoFn is >> >> > pulled out >> >> > > >>> of the >> >> > > >>> input PCollection. If a Runner needs to >> partition by >> >> > this coder, it >> >> > > >>> should ensure the coder of this PCollection >> matches >> >> > with the Coder >> >> > > >>> used to create the serialized bytes that are used >> >> > for partitioning >> >> > > >>> (whether or not this is length-prefixed). >> >> > > >>> >> >> > > >>> Concretely, the graph looks like >> >> > > >>> >> >> > > >>> >> >> > > >>> Runner SDK Harness >> >> > > >>> >> >> > > >>> WriteToGbk >> >> > > >>> | >> >> > > >>> ReadFromGbk >> >> > > >>> | >> >> > > >>> RunnerMapFn.processKeyValue(key, value) >> >> > > >>> | >> >> > > >>> WriteToDataChannel >> >> > > >>> ------------------------> >> >> > > >>> ReadFromDataChannel >> >> > > >>> | >> >> > > >>> (pcIn) >> >> > > >>> | >> >> > > >>> MyStatefulDoFn.process(key, >> value) >> >> > > >>> >> >> > > >>> Now the (key part of the) Coder of pcIn, which >> comes >> >> > from the proto >> >> > > >>> that the Runner sent to the SDK, must match the >> (key >> >> > part of the) >> >> > > >>> encoding used in WriteToGbk and ReadFromGbk. If a >> >> > LenthPrefix is >> >> > > >>> added >> >> > > >>> in one spot, it must be added in the other. >> >> > > >>> >> >> > > >>> >> >> > > >>> [1] >> >> > > >>> >> >> > >> https://github.com/apache/beam/blob/release-2.17.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L1183 >> >> > > >>> >> >> > > >>> On Tue, Nov 5, 2019 at 1:25 PM Maximilian Michels >> >> > > >>> <[email protected] <mailto:[email protected]> >> >> > <mailto:[email protected] <mailto:[email protected]>>> wrote: >> >> > > >>> > >> >> > > >>> > Hi, >> >> > > >>> > >> >> > > >>> > I wanted to get your opinion on something >> that I >> >> > have been >> >> > > >>> struggling >> >> > > >>> > with. It is about the coders for state >> requests >> >> > in portable >> >> > > >>> pipelines. >> >> > > >>> > >> >> > > >>> > In contrast to "classic" Beam, the Runner is >> not >> >> > guaranteed >> >> > > >>> to know >> >> > > >>> > which coder is used by the SDK. If the SDK >> >> > happens to use a >> >> > > >>> standard >> >> > > >>> > coder (also known as model coder), we will >> also >> >> > have it >> >> > > >>> available at the >> >> > > >>> > Runner, i.e. if the Runner is written in one >> of >> >> > the SDK >> >> > > >>> languages (e.g. >> >> > > >>> > Java). However, when we do not have a standard >> >> > coder, we just >> >> > > >>> treat the >> >> > > >>> > data from the SDK as a blob and just pass it >> >> > around as bytes. >> >> > > >>> > >> >> > > >>> > Problem >> >> > > >>> > ======= >> >> > > >>> > >> >> > > >>> > In the case of state requests which the SDK >> >> > Harness authors >> >> > > >>> to the >> >> > > >>> > Runner, we would like for the key associated >> with >> >> > the state >> >> > > >>> request to >> >> > > >>> > match the key of the element which led to >> >> > initiating the >> >> > > >>> state request. >> >> > > >>> > >> >> > > >>> > Example: >> >> > > >>> > >> >> > > >>> > Runner SDK Harness >> >> > > >>> > ------ ----------- >> >> > > >>> > >> >> > > >>> > KV["key","value"] --> Process Element >> >> > > >>> > | >> >> > > >>> > LookupState("key") <-- Request state of "key" >> >> > > >>> > | >> >> > > >>> > State["key"] --> Receive state >> >> > > >>> > >> >> > > >>> > >> >> > > >>> > For stateful DoFns, the Runner partitions the >> >> > data based on >> >> > > >>> the key. In >> >> > > >>> > Flink, this partitioning must not change >> during >> >> > the lifetime of a >> >> > > >>> > pipeline because the checkpointing otherwise >> >> > breaks[0]. The >> >> > > >>> key is >> >> > > >>> > extracted from the element and stored encoded. >> >> > > >>> > >> >> > > >>> > If we have a standard coder, it is basically >> the >> >> > same as in the >> >> > > >>> > "classic" Runner which takes the key and >> >> > serializes it. >> >> > > >>> However, when we >> >> > > >>> > have an SDK-specific coder, we basically do >> not >> >> > know how it >> >> > > >>> encodes. So >> >> > > >>> > far, we have been using the coder instantiated >> >> > from the >> >> > > >>> Proto, which is >> >> > > >>> > basically a LengthPrefixCoder[ByteArrayCoder] >> or >> >> > similar[1]. >> >> > > >>> We have had >> >> > > >>> > problems with this because the key encoding of >> >> > Java SDK state >> >> > > >>> requests >> >> > > >>> > did not match the key encoding on the Runner >> side >> >> > [2]. In an >> >> > > >>> attempt to >> >> > > >>> > fix those, it is now partly broken for >> portable >> >> > Python pipelines. >> >> > > >>> > Partly, because it "only" affects non-standard >> >> > coders. >> >> > > >>> > >> >> > > >>> > Non-standard coders yield the aforementioned >> >> > > >>> > LengthPrefixCoder[ByteArrayCoder]. Now, >> following >> >> > the usual >> >> > > >>> encoding >> >> > > >>> > scheme, we would simply encode the key using >> this >> >> > coder. >> >> > > >>> However, for >> >> > > >>> > state requests, the Python SDK leaves out the >> >> > length prefix >> >> > > >>> for certain >> >> > > >>> > coders, e.g. for primitives like int or byte. >> It >> >> > is possible >> >> > > >>> that one >> >> > > >>> > coder uses a length prefix, while another >> >> > doesn't. We have no >> >> > > >>> way of >> >> > > >>> > telling from the Runner side, if a length >> prefix >> >> > has been >> >> > > >>> used or not. >> >> > > >>> > This results in the keys to not match on the >> >> > Runner side and the >> >> > > >>> > partitioning to be broken. >> >> > > >>> > >> >> > > >>> > >> >> > > >>> > How to solve this? >> >> > > >>> > ================== >> >> > > >>> > >> >> > > >>> > (1) Should this simply be fixed on the Python >> SDK >> >> > side? One >> >> > > >>> fix would be >> >> > > >>> > to always append a length prefix to the key >> in state >> >> > > >>> requests, even for >> >> > > >>> > primitive coders like VarInt which do not use >> one. >> >> > > >>> > >> >> > > >>> > OR >> >> > > >>> > >> >> > > >>> > (2) Should the Runner detect that a >> non-standard >> >> > coder is >> >> > > >>> used? If so, >> >> > > >>> > it should just pass the bytes from the SDK >> >> > Harness and never >> >> > > >>> make an >> >> > > >>> > attempt to construct a coder based on the >> Proto. >> >> > > >>> > >> >> > > >>> > >> >> > > >>> > Thinking about it now, it seems pretty obvious >> >> > that (2) is >> >> > > >>> the most >> >> > > >>> > feasible way to avoid complications across all >> >> > current and >> >> > > >>> future SDKs >> >> > > >>> > for key encodings. Still, it is odd that the >> >> > Proto contains coder >> >> > > >>> > information which is not usable. >> >> > > >>> > >> >> > > >>> > What do you think? >> >> > > >>> > >> >> > > >>> > >> >> > > >>> > Thanks, >> >> > > >>> > Max >> >> > > >>> > >> >> > > >>> > >> >> > > >>> > [0] It is possible to restart the pipeline and >> >> > repartition the >> >> > > >>> > checkpointed data. >> >> > > >>> > [1] >> >> > > >>> > >> >> > > >>> >> >> > >> https://github.com/apache/beam/blob/c39752af5391fe698a2b4f1489c187ddd4d604c0/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L682 >> >> > > >>> > [2] >> https://issues.apache.org/jira/browse/BEAM-8157 >> >> > > >>> >> >> > >> >
