Thanks for sharing your thoughts which give me more help to deep
understanding the design of FnAPI, and It make more sense to me.

Great thanks Robert !

Best,
Jincheng


Robert Bradshaw <rober...@google.com> 于2019年11月12日周二 上午2:10写道:

> On Fri, Nov 8, 2019 at 10:04 PM jincheng sun <sunjincheng...@gmail.com>
> wrote:
> >
> > > Let us first define what are "standard coders". Usually it should be
> the coders defined in the Proto. However, personally I think the coders
> defined in the Java ModelCoders [1] seems more appropriate. The reason is
> that for a coder which has already appeared in Proto and still not added to
> the Java ModelCoders, it's always replaced by the runner with
> LengthPrefixCoder[ByteArrayCoder] and so the SDK harness will decode the
> data with LengthPrefixCoder[ByteArrayCoder] instead of the actual coder.
> That's to say, that coder does not still take effect in the SDK harness.
> Only when the coder is added in ModelCoders, it's 'known' and will take
> effect.
> >
> > Correct this point! The coder which is not contained in the Java
> ModelCoders is replaced with LengthPrefixCoder[ByteArrayCoder] at runner
> side and LengthPrefixCoder[CustomCoder] at SDK harness side.
> >
> > The point here is that the runner determines whether it knows the coder
> according to the coders defined in the Java ModelCoders, not the coders
> defined in the proto file. So if taking option 3, the non-standard coders
> which will be wrapped with LengthPrefixCoder should also be determined by
> the coders defined in the Java ModerCoders, not the coders defined in the
> proto file.
>
> Yes.
>
> Both as a matter of principle and pragmatics, it'd be good to avoid
> anything about the model only defined in Java files.
>
> Also, when we say "the runner" we cannot assume it's written in Java.
> While many Java OSS runners share these libraries, The Universal Local
> Runner is written in Python. Dataflow is written (primarily) in C++.
> My hope is that the FnAPI will be stable enough that one can even run
> multiple versions of the Java SDK with the same runner. What matters
> is that (1) if the same URN is used, all runners/SDKs agree on the
> encoding (2) there are certain coders (Windowed, LengthPrefixed, and
> KV come to mind) that all Runners/SDKs are required to understand, and
> (3) runners properly coerce coders they do not understand into coders
> that they do if they need to pull out and act on the bytes. The more
> coders the runner/SDK understands, the less often it needs to do this.
>
> > jincheng sun <sunjincheng...@gmail.com>于2019年11月9日 周六12:26写道:
> >>
> >> Hi Robert Bradshaw,
> >>
> >> Thanks a lot for the explanation. Very interesting topic!
> >>
> >> Let us first define what are "standard coders". Usually it should be
> the coders defined in the Proto. However, personally I think the coders
> defined in the Java ModelCoders [1] seems more appropriate. The reason is
> that for a coder which has already appeared in Proto and still not added to
> the Java ModelCoders, it's always replaced by the runner with
> LengthPrefixCoder[ByteArrayCoder] and so the SDK harness will decode the
> data with LengthPrefixCoder[ByteArrayCoder] instead of the actual coder.
> That's to say, that coder does not still take effect in the SDK harness.
> Only when the coder is added in ModelCoders, it's 'known' and will take
> effect.
> >>
> >> So if we take option 3, the non-standard coders which will be wrapped
> with LengthPrefixCoder should be synced with the coders defined in the Java
> ModerCoders. (From this point of view, option 1 seems more clean!)
> >>
> >> Please correct me if I missed something. Thanks a lot!
> >>
> >> Best,
> >> Jincheng
> >>
> >> [1]
> https://github.com/apache/beam/blob/01726e9c62313749f9ea7c93063a1178abd1a8db/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java#L59
> >>
> >> Robert Burke <rob...@frantil.com> 于2019年11月9日周六 上午8:46写道:
> >>>
> >>> And by "I wasn't clear" I meant "I misread the options".
> >>>
> >>> On Fri, Nov 8, 2019, 4:14 PM Robert Burke <rob...@frantil.com> wrote:
> >>>>
> >>>> Reading back, I wasn't clear: the Go SDK does Option (1), putting the
> LP explicitly during encoding [1] for the runner proto, and explicitly
> expects LPs to contain a custom coder URN on decode for execution [2].
> (Modulo an old bug in Dataflow where the urn was empty)
> >>>>
> >>>>
> >>>> [1]
> https://github.com/apache/beam/blob/4364a214dfe6d8d5dd84b1bb91d579f466492ca5/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L348
> >>>> [2]
> https://github.com/apache/beam/blob/4364a214dfe6d8d5dd84b1bb91d579f466492ca5/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L219
> >>>>
> >>>>
> >>>> On Fri, Nov 8, 2019, 10:28 AM Robert Bradshaw <rober...@google.com>
> wrote:
> >>>>>
> >>>>> On Fri, Nov 8, 2019 at 2:09 AM jincheng sun <
> sunjincheng...@gmail.com> wrote:
> >>>>> >
> >>>>> > Hi,
> >>>>> >
> >>>>> > Sorry for my late reply. It seems the conclusion has been reached.
> I just want to share my personal thoughts.
> >>>>> >
> >>>>> > Generally, both option 1 and 3 make sense to me.
> >>>>> >
> >>>>> > >> The key concept here is not "standard coder" but "coder that the
> >>>>> > >> runner does not understand." This knowledge is only in the
> runner.
> >>>>> > >> Also has the downside of (2).
> >>>>> >
> >>>>> > >Yes, I had assumed "non-standard" and "unknown" are the same, but
> the
> >>>>> > >latter can be a subset of the former, i.e. if a Runner does not
> support
> >>>>> > >all of the standard coders for some reason.
> >>>>> >
> >>>>> > I'm also assume that "non-standard" and "unknown" are the same.
> Currently, in the runner side[1] it
> >>>>> > decides whether the coder is unknown(wrap with length prefix
> coder) according to whether the coder is among
> >>>>> > the standard coders. It will not communicate with harness to make
> this decision.
> >>>>> >
> >>>>> > So, from my point of view, we can update the PR according to
> option 1 or 3.
> >>>>> >
> >>>>> > [1]
> https://github.com/apache/beam/blob/66a67e6580b93906038b31ae7070204cec90999c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L62
> >>>>>
> >>>>> That list is populated in Java code [1] and has typically been a
> >>>>> subset of what is in the proto file. Things like StringUtf8Coder and
> >>>>> DoubleCoder have been added at different times to different SDKs and
> >>>>> Runners, sometimes long after the URN is in the proto. Having to keep
> >>>>> this list synchronized (and versioned) would be a regression.
> >>>>>
> >>>>> [1]
> https://github.com/apache/beam/blob/release-2.17.0/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
> >>>>>
> >>>>> The PR taking approach (1) looks good at a first glance (I see others
> >>>>> are reviewing it). Thanks.
> >>>>>
> >>>>> > Maximilian Michels <m...@apache.org> 于2019年11月8日周五 上午3:35写道:
> >>>>> >>
> >>>>> >> > While the Go SDK doesn't yet support a State API, Option 3) is
> what the Go SDK does for all non-standard coders (aka custom coders) anyway.
> >>>>> >>
> >>>>> >> For wire transfer, the Java Runner also adds a LengthPrefixCoder
> for the
> >>>>> >> coder and its subcomponents. The problem is that this is an
> implicit
> >>>>> >> assumption made. In the Proto, we do not have this represented.
> This is
> >>>>> >> why **for state requests**, we end up with a
> >>>>> >> "LengthPrefixCoder[CustomCoder]" on the Runner and a
> "CustomCoder" on
> >>>>> >> the SDK Harness side. Note that the Python Harness does wrap
> unknown
> >>>>> >> coders in a LengthPrefixCoder for transferring regular elements,
> but the
> >>>>> >> LengthPrefixCoder is not preserved for the state requests.
> >>>>> >>
> >>>>> >> In that sense (3) is good because it follows this implicit notion
> of
> >>>>> >> adding a LengthPrefixCoder for wire transfer, but applies it to
> state
> >>>>> >> requests.
> >>>>> >>
> >>>>> >> However, option (1) is most reliable because the
> LengthPrefixCoder is
> >>>>> >> actually in the Proto. So "CustomCoder" will always be
> represented as
> >>>>> >> "LengthPrefixCoder[CustomCoder]", and only standard coders will
> be added
> >>>>> >> without a LengthPrefixCoder.
> >>>>> >>
> >>>>> >> > I'd really like to avoid implicit agreements about how the
> coder that
> >>>>> >> > should be used differs from what's specified in the proto in
> different
> >>>>> >> > contexts.
> >>>>> >>
> >>>>> >> Option (2) would work on top of the existing logic because
> replacing a
> >>>>> >> non-standard coder with a "NOOP coder" would just be used by the
> Runner
> >>>>> >> to produce a serialized version of the key for partitioning. Flink
> >>>>> >> always operates on the serialized key, be it standard or
> non-standard
> >>>>> >> coder. It wouldn't be necessary to change any of the existing wire
> >>>>> >> transfer logic or representation. I understand that it would be
> less
> >>>>> >> ideal, but maybe easier to fix for the release.
> >>>>> >>
> >>>>> >> > The key concept here is not "standard coder" but "coder that the
> >>>>> >> > runner does not understand." This knowledge is only in the
> runner.
> >>>>> >> > Also has the downside of (2).
> >>>>> >>
> >>>>> >> Yes, I had assumed "non-standard" and "unknown" are the same, but
> the
> >>>>> >> latter can be a subset of the former, i.e. if a Runner does not
> support
> >>>>> >> all of the standard coders for some reason.
> >>>>> >>
> >>>>> >> > This means that the wire format that the runner sends for the
> "key" represents the exact same wire format it will receive for state
> requests.
> >>>>> >>
> >>>>> >> The wire format for the entire element is the same. Otherwise we
> >>>>> >> wouldn't be able to process data between the Runner and the SDK
> Harness.
> >>>>> >> However, the problem is that the way the Runner instantiates the
> key
> >>>>> >> coder to partition elements, does not match how the SDK encodes
> the key
> >>>>> >> when it sends a state request to the Runner. Conceptually, those
> two
> >>>>> >> situations should be the same, but in practice they are not.
> >>>>> >>
> >>>>> >>
> >>>>> >> Now that I thought about it again option (1) is probably the most
> >>>>> >> explicit and in that sense cleanest. However, option (3) is kind
> of fair
> >>>>> >> because it would just replicate the implicit LengthPrefixCoder
> behavior
> >>>>> >> we have for general wire transfer also for state requests. Option
> (2) I
> >>>>> >> suppose is the most implicit and runner-specific, should probably
> be
> >>>>> >> avoided in the long run.
> >>>>> >>
> >>>>> >> So I'd probably opt for (1) and I would update the PR[1] rather
> soon
> >>>>> >> because this currently blocks the release, as this is a
> regression from
> >>>>> >> 2.16.0.[2]
> >>>>> >>
> >>>>> >>
> >>>>> >> -Max
> >>>>> >>
> >>>>> >> [1] https://github.com/apache/beam/pull/9997
> >>>>> >> [2] (In 2.16.0 it worked for Python because the Runner used a
> >>>>> >> ByteArrayCoder with the OUTER encoding context for the key which
> was
> >>>>> >> basically option (2). Only problem that, for standard coders the
> Java
> >>>>> >> SDK Harness produced non-matching state request keys, due to it
> using
> >>>>> >> the NESTED context.)
> >>>>> >>
> >>>>> >> On 07.11.19 18:01, Luke Cwik wrote:
> >>>>> >> >
> >>>>> >> >
> >>>>> >> > On Thu, Nov 7, 2019 at 8:22 AM Robert Bradshaw <
> rober...@google.com
> >>>>> >> > <mailto:rober...@google.com>> wrote:
> >>>>> >> >
> >>>>> >> >     On Thu, Nov 7, 2019 at 6:26 AM Maximilian Michels <
> m...@apache.org
> >>>>> >> >     <mailto:m...@apache.org>> wrote:
> >>>>> >> >      >
> >>>>> >> >      > Thanks for the feedback thus far. Some more comments:
> >>>>> >> >      >
> >>>>> >> >      > > Instead, the runner knows ahead of time that it
> >>>>> >> >      > > will need to instantiate this coder, and should update
> the bundle
> >>>>> >> >      > > processor to specify
> KvCoder<LengthPrefixCoder<CustomCoder>,
> >>>>> >> >      > > VarIntCoder> as the coder so both can pull it out in a
> >>>>> >> >     consistent way.
> >>>>> >> >      >
> >>>>> >> >      > By "update the bundle processor", do you mean modifying
> the
> >>>>> >> >      > ProcessBundleDescriptor's BagUserState with the correct
> key coder?
> >>>>> >> >      > Conceptually that is possible, but the current
> implementation
> >>>>> >> >     does not
> >>>>> >> >      > allow for this to happen:
> >>>>> >> >      >
> >>>>> >> >
> https://github.com/apache/beam/blob/076a037e53ca39b61c1dfeb580527bc8d0371dc1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java#L284
> >>>>> >> >      > It enforces ByteString which does not tell the SDK
> Harness anything
> >>>>> >> >      > about the desired encoding.
> >>>>> >> >
> >>>>> >> >     I meant update the BundleProcessDescriptor proto that is
> sent to the
> >>>>> >> >     SDK
> >>>>> >> >
> https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L140
> ,
> >>>>> >> >     essentially option (1).
> >>>>> >> >
> >>>>> >> >
> >>>>> >> > For clarity, the "key" coder is specified by the stateful
> ParDo's main
> >>>>> >> > input PCollection. This means that the ProcessBundleDescriptor
> should
> >>>>> >> > have something that has the length prefix as part of the remote
> grpc
> >>>>> >> > port specification AND the PCollection that follows it which is
> the main
> >>>>> >> > input for the stateful ParDo. This means that the wire format
> that the
> >>>>> >> > runner sends for the "key" represents the exact same wire
> format it will
> >>>>> >> > receive for state requests.
> >>>>> >> >
> >>>>> >> > I see what you mean Max,
> >>>>> >> >
> https://github.com/apache/beam/blob/076a037e53ca39b61c1dfeb580527bc8d0371dc1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java#L284
> >>>>> >> > could change to represent the actual coder (whether it be
> >>>>> >> > LengthPrefixCoder<BytesCoder> or some other model coder
> combination).
> >>>>> >> > Currently that logic assumes that the runner can perform the
> backwards
> >>>>> >> > mapping by decoding the bytestring with the appropriate coder.
> >>>>> >> >
> >>>>> >> >      > Since the above does not seem feasible, I see the
> following options:
> >>>>> >> >      >
> >>>>> >> >      > (1) Modify the pipeline Proto before the translation and
> wrap a
> >>>>> >> >      > LengthPrefixCoder around non-standard key coders for
> stateful
> >>>>> >> >      > transforms. This would change the encoding for the entire
> >>>>> >> >     element, to be
> >>>>> >> >      > sure that the key coder for state requests contains a
> >>>>> >> >     LengthPrefixCoder
> >>>>> >> >      > for state requests from the SDK Harness. Not optimal.
> >>>>> >> >
> >>>>> >> >     Yes. The contract should be that both the runner and SDK
> use the
> >>>>> >> >     coders that are specified in the proto. The runner controls
> the proto,
> >>>>> >> >     and should ensure it only sends protos it will be able to
> handle the
> >>>>> >> >     SDK responding to. I'm not seeing why this is sub-optimal.
> >>>>> >> >
> >>>>> >> >      > (2) Add a new method
> WireCoders#instantiateRunnerWireKeyCoder which
> >>>>> >> >      > returns the correct key coder, i.e. for standard coders,
> the concrete
> >>>>> >> >      > coder, and for non-standard coders a ByteArrayCoder. We
> also need to
> >>>>> >> >      > ensure the key encoding on the Runner side is OUTER
> context, to avoid
> >>>>> >> >      > adding a length prefix to the encoded bytes. Basically,
> the
> >>>>> >> >     non-standard
> >>>>> >> >      > coders result in a NOOP coder which does not touch the
> key bytes.
> >>>>> >> >
> >>>>> >> >     I'd really like to avoid implicit agreements about how the
> coder that
> >>>>> >> >     should be used differs from what's specified in the proto
> in different
> >>>>> >> >     contexts.
> >>>>> >> >
> >>>>> >> >      > (3) Patch the Python SDK to ensure non-standard state
> key coders are
> >>>>> >> >      > always wrapped in a LengthPrefixCoder. That way, we can
> keep the
> >>>>> >> >      > existing logic on the Runner side.
> >>>>> >> >
> >>>>> >> >     The key concept here is not "standard coder" but "coder
> that the
> >>>>> >> >     runner does not understand." This knowledge is only in the
> runner.
> >>>>> >> >     Also has the downside of (2).
> >>>>> >> >
> >>>>> >> >      > Option (2) seems like the most practical.
> >>>>> >> >      >
> >>>>> >> >      > -Max
> >>>>> >> >      >
> >>>>> >> >      > On 06.11.19 17:26, Robert Bradshaw wrote:
> >>>>> >> >      > > On Wed, Nov 6, 2019 at 2:55 AM Maximilian Michels
> >>>>> >> >     <m...@apache.org <mailto:m...@apache.org>> wrote:
> >>>>> >> >      > >>
> >>>>> >> >      > >> Let me try to clarify:
> >>>>> >> >      > >>
> >>>>> >> >      > >>> The Coder used for State/Timers in a StatefulDoFn is
> pulled
> >>>>> >> >     out of the
> >>>>> >> >      > >>> input PCollection. If a Runner needs to partition by
> this
> >>>>> >> >     coder, it
> >>>>> >> >      > >>> should ensure the coder of this PCollection matches
> with the
> >>>>> >> >     Coder
> >>>>> >> >      > >>> used to create the serialized bytes that are used for
> >>>>> >> >     partitioning
> >>>>> >> >      > >>> (whether or not this is length-prefixed).
> >>>>> >> >      > >>
> >>>>> >> >      > >> That is essentially what I had assumed when I wrote
> the code. The
> >>>>> >> >      > >> problem is the coder can be "pulled out" in different
> ways.
> >>>>> >> >      > >>
> >>>>> >> >      > >> For example, let's say we have the following Proto
> PCollection
> >>>>> >> >     coder
> >>>>> >> >      > >> with non-standard coder "CustomCoder" as the key
> coder:
> >>>>> >> >      > >>
> >>>>> >> >      > >>     KvCoder<CustomCoder, VarIntCoder>
> >>>>> >> >      > >>
> >>>>> >> >      > >>   From the Runner side, this currently looks like the
> following:
> >>>>> >> >      > >>
> >>>>> >> >      > >>     PCol: KvCoder<LengthPrefixCoder<ByteArrayCoder>,
> VarIntCoder>
> >>>>> >> >      > >>     Key:  LengthPrefixCoder<ByteArrayCoder>
> >>>>> >> >      > >
> >>>>> >> >      > > This is I think where the error is. When If the proto
> references
> >>>>> >> >      > > KvCoder<CustomCoder, VarIntCoder> it should not be
> pulled out as
> >>>>> >> >      > > KvCoder<LengthPrefixCoder<ByteArrayCoder>,
> VarIntCoder>; as that
> >>>>> >> >      > > doesn't have the same encoding. Trying to do
> instantiate such a
> >>>>> >> >     coder
> >>>>> >> >      > > should be an error. Instead, the runner knows ahead of
> time that it
> >>>>> >> >      > > will need to instantiate this coder, and should update
> the bundle
> >>>>> >> >      > > processor to specify
> KvCoder<LengthPrefixCoder<CustomCoder>,
> >>>>> >> >      > > VarIntCoder> as the coder so both can pull it out in a
> >>>>> >> >     consistent way.
> >>>>> >> >      > >
> >>>>> >> >      > > When the coder is
> KvCoder<LengthPrefixCoder<CustomCoder>,
> >>>>> >> >     VarIntCoder>
> >>>>> >> >      > > instantiating it as KvCoder<ByteArrayCoder,
> VarIntCoder> on the
> >>>>> >> >     runner
> >>>>> >> >      > > is of course OK as they do have the same encoding.
> >>>>> >> >      > >
> >>>>> >> >      > >> At the SDK Harness, we have the coder available:
> >>>>> >> >      > >>
> >>>>> >> >      > >>     PCol: KvCoder<CustomCoder, VarIntCoder>
> >>>>> >> >      > >>     Key:  CustomCoder
> >>>>> >> >      > >>
> >>>>> >> >      > >> Currently, when the SDK Harness serializes a key for
> a state
> >>>>> >> >     request,
> >>>>> >> >      > >> the custom coder may happen to add a length prefix,
> or it may
> >>>>> >> >     not. It
> >>>>> >> >      > >> depends on the coder used. The correct behavior would
> be to
> >>>>> >> >     use the same
> >>>>> >> >      > >> representation as on the Runner side.
> >>>>> >> >      > >>
> >>>>> >> >      > >>> Specifically, "We have no way of telling from the
> Runner
> >>>>> >> >     side, if a length prefix has been used or not." seems false
> >>>>> >> >      > >>
> >>>>> >> >      > >> The Runner cannot inspect an unknown coder, it only
> has the
> >>>>> >> >     opaque Proto
> >>>>> >> >      > >> information available which does not allow
> introspection of
> >>>>> >> >     non-standard
> >>>>> >> >      > >> coders. With the current state, the Runner may think
> the coder
> >>>>> >> >     adds a
> >>>>> >> >      > >> length prefix but the Python SDK worker could choose
> to add
> >>>>> >> >     none. This
> >>>>> >> >      > >> produces an inconsistent key encoding. See above.
> >>>>> >> >      > >
> >>>>> >> >      > > I think what's being conflated here is "the Coder has
> been
> >>>>> >> >     wrapped in
> >>>>> >> >      > > a LengthPrefixCoder" vs. "the coder does length
> prefixing."
> >>>>> >> >     These are
> >>>>> >> >      > > two orthogonal concepts. The runner in general only
> knows the
> >>>>> >> >     former.
> >>>>> >> >      > >
> >>>>> >> >      > >> It looks like the key encoding for state requests on
> the
> >>>>> >> >     Python SDK
> >>>>> >> >      > >> Harness side is broken. For transferring elements of a
> >>>>> >> >     PCollection, the
> >>>>> >> >      > >> coders are obviously working correctly, but for
> encoding
> >>>>> >> >     solely the key
> >>>>> >> >      > >> of an element, there is a consistency issue.
> >>>>> >> >      > >>
> >>>>> >> >      > >>
> >>>>> >> >      > >> -Max
> >>>>> >> >      > >>
> >>>>> >> >      > >> On 06.11.19 05:35, Kenneth Knowles wrote:
> >>>>> >> >      > >>> Specifically, "We have no way of telling from the
> Runner
> >>>>> >> >     side, if a
> >>>>> >> >      > >>> length prefix has been used or not." seems false.
> The runner
> >>>>> >> >     has all the
> >>>>> >> >      > >>> information since length prefix is a model coder.
> Didn't we
> >>>>> >> >     agree that
> >>>>> >> >      > >>> all coders should be self-delimiting in runner/SDK
> interactions,
> >>>>> >> >      > >>> requiring length-prefix only when there is an opaque
> or
> >>>>> >> >     dynamic-length
> >>>>> >> >      > >>> value? I assume you mean that at runtime the worker
> for a
> >>>>> >> >     given engine
> >>>>> >> >      > >>> does not know?
> >>>>> >> >      > >>>
> >>>>> >> >      > >>> Kenn
> >>>>> >> >      > >>>
> >>>>> >> >      > >>> On Tue, Nov 5, 2019 at 3:19 PM Luke Cwik <
> lc...@google.com
> >>>>> >> >     <mailto:lc...@google.com>
> >>>>> >> >      > >>> <mailto:lc...@google.com <mailto:lc...@google.com>>>
> wrote:
> >>>>> >> >      > >>>
> >>>>> >> >      > >>>      +1 to what Robert said.
> >>>>> >> >      > >>>
> >>>>> >> >      > >>>      On Tue, Nov 5, 2019 at 2:36 PM Robert Bradshaw
> >>>>> >> >     <rober...@google.com <mailto:rober...@google.com>
> >>>>> >> >      > >>>      <mailto:rober...@google.com
> >>>>> >> >     <mailto:rober...@google.com>>> wrote:
> >>>>> >> >      > >>>
> >>>>> >> >      > >>>          The Coder used for State/Timers in a
> StatefulDoFn is
> >>>>> >> >     pulled out
> >>>>> >> >      > >>>          of the
> >>>>> >> >      > >>>          input PCollection. If a Runner needs to
> partition by
> >>>>> >> >     this coder, it
> >>>>> >> >      > >>>          should ensure the coder of this PCollection
> matches
> >>>>> >> >     with the Coder
> >>>>> >> >      > >>>          used to create the serialized bytes that
> are used
> >>>>> >> >     for partitioning
> >>>>> >> >      > >>>          (whether or not this is length-prefixed).
> >>>>> >> >      > >>>
> >>>>> >> >      > >>>          Concretely, the graph looks like
> >>>>> >> >      > >>>
> >>>>> >> >      > >>>
> >>>>> >> >      > >>>          Runner                          SDK Harness
> >>>>> >> >      > >>>
> >>>>> >> >      > >>>          WriteToGbk
> >>>>> >> >      > >>>               |
> >>>>> >> >      > >>>          ReadFromGbk
> >>>>> >> >      > >>>               |
> >>>>> >> >      > >>>          RunnerMapFn.processKeyValue(key, value)
> >>>>> >> >      > >>>               |
> >>>>> >> >      > >>>               WriteToDataChannel
> >>>>> >> >      > >>>                       ------------------------>
> >>>>> >> >      > >>>                            ReadFromDataChannel
> >>>>> >> >      > >>>                                          |
> >>>>> >> >      > >>>                                      (pcIn)
> >>>>> >> >      > >>>                                          |
> >>>>> >> >      > >>>
>  MyStatefulDoFn.process(key, value)
> >>>>> >> >      > >>>
> >>>>> >> >      > >>>          Now the (key part of the) Coder of pcIn,
> which comes
> >>>>> >> >     from the proto
> >>>>> >> >      > >>>          that the Runner sent to the SDK, must match
> the (key
> >>>>> >> >     part of the)
> >>>>> >> >      > >>>          encoding used in WriteToGbk and
> ReadFromGbk. If a
> >>>>> >> >     LenthPrefix is
> >>>>> >> >      > >>>          added
> >>>>> >> >      > >>>          in one spot, it must be added in the other.
> >>>>> >> >      > >>>
> >>>>> >> >      > >>>
> >>>>> >> >      > >>>          [1]
> >>>>> >> >      > >>>
> >>>>> >> >
> https://github.com/apache/beam/blob/release-2.17.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L1183
> >>>>> >> >      > >>>
> >>>>> >> >      > >>>          On Tue, Nov 5, 2019 at 1:25 PM Maximilian
> Michels
> >>>>> >> >      > >>>          <m...@apache.org <mailto:m...@apache.org>
> >>>>> >> >     <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>           > Hi,
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>           > I wanted to get your opinion on
> something that I
> >>>>> >> >     have been
> >>>>> >> >      > >>>          struggling
> >>>>> >> >      > >>>           > with. It is about the coders for state
> requests
> >>>>> >> >     in portable
> >>>>> >> >      > >>>          pipelines.
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>           > In contrast to "classic" Beam, the
> Runner is not
> >>>>> >> >     guaranteed
> >>>>> >> >      > >>>          to know
> >>>>> >> >      > >>>           > which coder is used by the SDK. If the
> SDK
> >>>>> >> >     happens to use a
> >>>>> >> >      > >>>          standard
> >>>>> >> >      > >>>           > coder (also known as model coder), we
> will also
> >>>>> >> >     have it
> >>>>> >> >      > >>>          available at the
> >>>>> >> >      > >>>           > Runner, i.e. if the Runner is written in
> one of
> >>>>> >> >     the SDK
> >>>>> >> >      > >>>          languages (e.g.
> >>>>> >> >      > >>>           > Java). However, when we do not have a
> standard
> >>>>> >> >     coder, we just
> >>>>> >> >      > >>>          treat the
> >>>>> >> >      > >>>           > data from the SDK as a blob and just
> pass it
> >>>>> >> >     around as bytes.
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>           > Problem
> >>>>> >> >      > >>>           > =======
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>           > In the case of state requests which the
> SDK
> >>>>> >> >     Harness authors
> >>>>> >> >      > >>>          to the
> >>>>> >> >      > >>>           > Runner, we would like for the key
> associated with
> >>>>> >> >     the state
> >>>>> >> >      > >>>          request to
> >>>>> >> >      > >>>           > match the key of the element which led to
> >>>>> >> >     initiating the
> >>>>> >> >      > >>>          state request.
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>           > Example:
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>           > Runner                 SDK Harness
> >>>>> >> >      > >>>           > ------                 -----------
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>           > KV["key","value"]  --> Process Element
> >>>>> >> >      > >>>           >                                |
> >>>>> >> >      > >>>           > LookupState("key") <-- Request state of
> "key"
> >>>>> >> >      > >>>           >          |
> >>>>> >> >      > >>>           >     State["key"]    --> Receive state
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>           > For stateful DoFns, the Runner
> partitions the
> >>>>> >> >     data based on
> >>>>> >> >      > >>>          the key. In
> >>>>> >> >      > >>>           > Flink, this partitioning must not change
> during
> >>>>> >> >     the lifetime of a
> >>>>> >> >      > >>>           > pipeline because the checkpointing
> otherwise
> >>>>> >> >     breaks[0]. The
> >>>>> >> >      > >>>          key is
> >>>>> >> >      > >>>           > extracted from the element and stored
> encoded.
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>           > If we have a standard coder, it is
> basically the
> >>>>> >> >     same as in the
> >>>>> >> >      > >>>           > "classic" Runner which takes the key and
> >>>>> >> >     serializes it.
> >>>>> >> >      > >>>          However, when we
> >>>>> >> >      > >>>           > have an SDK-specific coder, we basically
> do not
> >>>>> >> >     know how it
> >>>>> >> >      > >>>          encodes. So
> >>>>> >> >      > >>>           > far, we have been using the coder
> instantiated
> >>>>> >> >     from the
> >>>>> >> >      > >>>          Proto, which is
> >>>>> >> >      > >>>           > basically a
> LengthPrefixCoder[ByteArrayCoder] or
> >>>>> >> >     similar[1].
> >>>>> >> >      > >>>          We have had
> >>>>> >> >      > >>>           > problems with this because the key
> encoding of
> >>>>> >> >     Java SDK state
> >>>>> >> >      > >>>          requests
> >>>>> >> >      > >>>           > did not match the key encoding on the
> Runner side
> >>>>> >> >     [2]. In an
> >>>>> >> >      > >>>          attempt to
> >>>>> >> >      > >>>           > fix those, it is now partly broken for
> portable
> >>>>> >> >     Python pipelines.
> >>>>> >> >      > >>>           > Partly, because it "only" affects
> non-standard
> >>>>> >> >     coders.
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>           > Non-standard coders yield the
> aforementioned
> >>>>> >> >      > >>>           > LengthPrefixCoder[ByteArrayCoder]. Now,
> following
> >>>>> >> >     the usual
> >>>>> >> >      > >>>          encoding
> >>>>> >> >      > >>>           > scheme, we would simply encode the key
> using this
> >>>>> >> >     coder.
> >>>>> >> >      > >>>          However, for
> >>>>> >> >      > >>>           > state requests, the Python SDK leaves
> out the
> >>>>> >> >     length prefix
> >>>>> >> >      > >>>          for certain
> >>>>> >> >      > >>>           > coders, e.g. for primitives like int or
> byte. It
> >>>>> >> >     is possible
> >>>>> >> >      > >>>          that one
> >>>>> >> >      > >>>           > coder uses a length prefix, while another
> >>>>> >> >     doesn't. We have no
> >>>>> >> >      > >>>          way of
> >>>>> >> >      > >>>           > telling from the Runner side, if a
> length prefix
> >>>>> >> >     has been
> >>>>> >> >      > >>>          used or not.
> >>>>> >> >      > >>>           > This results in the keys to not match on
> the
> >>>>> >> >     Runner side and the
> >>>>> >> >      > >>>           > partitioning to be broken.
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>           > How to solve this?
> >>>>> >> >      > >>>           > ==================
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>           > (1) Should this simply be fixed on the
> Python SDK
> >>>>> >> >     side? One
> >>>>> >> >      > >>>          fix would be
> >>>>> >> >      > >>>           > to always append a length prefix to the
> key in state
> >>>>> >> >      > >>>          requests, even for
> >>>>> >> >      > >>>           > primitive coders like VarInt which do
> not use one.
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>           > OR
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>           > (2) Should the Runner detect that a
> non-standard
> >>>>> >> >     coder is
> >>>>> >> >      > >>>          used? If so,
> >>>>> >> >      > >>>           > it should just pass the bytes from the
> SDK
> >>>>> >> >     Harness and never
> >>>>> >> >      > >>>          make an
> >>>>> >> >      > >>>           > attempt to construct a coder based on
> the Proto.
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>           > Thinking about it now, it seems pretty
> obvious
> >>>>> >> >     that (2) is
> >>>>> >> >      > >>>          the most
> >>>>> >> >      > >>>           > feasible way to avoid complications
> across all
> >>>>> >> >     current and
> >>>>> >> >      > >>>          future SDKs
> >>>>> >> >      > >>>           > for key encodings. Still, it is odd that
> the
> >>>>> >> >     Proto contains coder
> >>>>> >> >      > >>>           > information which is not usable.
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>           > What do you think?
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>           > Thanks,
> >>>>> >> >      > >>>           > Max
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>           > [0] It is possible to restart the
> pipeline and
> >>>>> >> >     repartition the
> >>>>> >> >      > >>>           > checkpointed data.
> >>>>> >> >      > >>>           > [1]
> >>>>> >> >      > >>>           >
> >>>>> >> >      > >>>
> >>>>> >> >
> https://github.com/apache/beam/blob/c39752af5391fe698a2b4f1489c187ddd4d604c0/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L682
> >>>>> >> >      > >>>           > [2]
> https://issues.apache.org/jira/browse/BEAM-8157
> >>>>> >> >      > >>>
> >>>>> >> >
>

Reply via email to