Reading back, I wasn't clear: the Go SDK does Option (1), putting the LP
explicitly during encoding [1] for the runner proto, and explicitly expects
LPs to contain a custom coder URN on decode for execution [2]. (Modulo an
old bug in Dataflow where the urn was empty)


[1]
https://github.com/apache/beam/blob/4364a214dfe6d8d5dd84b1bb91d579f466492ca5/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L348
[2]
https://github.com/apache/beam/blob/4364a214dfe6d8d5dd84b1bb91d579f466492ca5/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L219


On Fri, Nov 8, 2019, 10:28 AM Robert Bradshaw <rober...@google.com> wrote:

> On Fri, Nov 8, 2019 at 2:09 AM jincheng sun <sunjincheng...@gmail.com>
> wrote:
> >
> > Hi,
> >
> > Sorry for my late reply. It seems the conclusion has been reached. I
> just want to share my personal thoughts.
> >
> > Generally, both option 1 and 3 make sense to me.
> >
> > >> The key concept here is not "standard coder" but "coder that the
> > >> runner does not understand." This knowledge is only in the runner.
> > >> Also has the downside of (2).
> >
> > >Yes, I had assumed "non-standard" and "unknown" are the same, but the
> > >latter can be a subset of the former, i.e. if a Runner does not support
> > >all of the standard coders for some reason.
> >
> > I'm also assume that "non-standard" and "unknown" are the same.
> Currently, in the runner side[1] it
> > decides whether the coder is unknown(wrap with length prefix coder)
> according to whether the coder is among
> > the standard coders. It will not communicate with harness to make this
> decision.
> >
> > So, from my point of view, we can update the PR according to option 1 or
> 3.
> >
> > [1]
> https://github.com/apache/beam/blob/66a67e6580b93906038b31ae7070204cec90999c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L62
>
> That list is populated in Java code [1] and has typically been a
> subset of what is in the proto file. Things like StringUtf8Coder and
> DoubleCoder have been added at different times to different SDKs and
> Runners, sometimes long after the URN is in the proto. Having to keep
> this list synchronized (and versioned) would be a regression.
>
> [1]
> https://github.com/apache/beam/blob/release-2.17.0/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
>
> The PR taking approach (1) looks good at a first glance (I see others
> are reviewing it). Thanks.
>
> > Maximilian Michels <m...@apache.org> 于2019年11月8日周五 上午3:35写道:
> >>
> >> > While the Go SDK doesn't yet support a State API, Option 3) is what
> the Go SDK does for all non-standard coders (aka custom coders) anyway.
> >>
> >> For wire transfer, the Java Runner also adds a LengthPrefixCoder for the
> >> coder and its subcomponents. The problem is that this is an implicit
> >> assumption made. In the Proto, we do not have this represented. This is
> >> why **for state requests**, we end up with a
> >> "LengthPrefixCoder[CustomCoder]" on the Runner and a "CustomCoder" on
> >> the SDK Harness side. Note that the Python Harness does wrap unknown
> >> coders in a LengthPrefixCoder for transferring regular elements, but the
> >> LengthPrefixCoder is not preserved for the state requests.
> >>
> >> In that sense (3) is good because it follows this implicit notion of
> >> adding a LengthPrefixCoder for wire transfer, but applies it to state
> >> requests.
> >>
> >> However, option (1) is most reliable because the LengthPrefixCoder is
> >> actually in the Proto. So "CustomCoder" will always be represented as
> >> "LengthPrefixCoder[CustomCoder]", and only standard coders will be added
> >> without a LengthPrefixCoder.
> >>
> >> > I'd really like to avoid implicit agreements about how the coder that
> >> > should be used differs from what's specified in the proto in different
> >> > contexts.
> >>
> >> Option (2) would work on top of the existing logic because replacing a
> >> non-standard coder with a "NOOP coder" would just be used by the Runner
> >> to produce a serialized version of the key for partitioning. Flink
> >> always operates on the serialized key, be it standard or non-standard
> >> coder. It wouldn't be necessary to change any of the existing wire
> >> transfer logic or representation. I understand that it would be less
> >> ideal, but maybe easier to fix for the release.
> >>
> >> > The key concept here is not "standard coder" but "coder that the
> >> > runner does not understand." This knowledge is only in the runner.
> >> > Also has the downside of (2).
> >>
> >> Yes, I had assumed "non-standard" and "unknown" are the same, but the
> >> latter can be a subset of the former, i.e. if a Runner does not support
> >> all of the standard coders for some reason.
> >>
> >> > This means that the wire format that the runner sends for the "key"
> represents the exact same wire format it will receive for state requests.
> >>
> >> The wire format for the entire element is the same. Otherwise we
> >> wouldn't be able to process data between the Runner and the SDK Harness.
> >> However, the problem is that the way the Runner instantiates the key
> >> coder to partition elements, does not match how the SDK encodes the key
> >> when it sends a state request to the Runner. Conceptually, those two
> >> situations should be the same, but in practice they are not.
> >>
> >>
> >> Now that I thought about it again option (1) is probably the most
> >> explicit and in that sense cleanest. However, option (3) is kind of fair
> >> because it would just replicate the implicit LengthPrefixCoder behavior
> >> we have for general wire transfer also for state requests. Option (2) I
> >> suppose is the most implicit and runner-specific, should probably be
> >> avoided in the long run.
> >>
> >> So I'd probably opt for (1) and I would update the PR[1] rather soon
> >> because this currently blocks the release, as this is a regression from
> >> 2.16.0.[2]
> >>
> >>
> >> -Max
> >>
> >> [1] https://github.com/apache/beam/pull/9997
> >> [2] (In 2.16.0 it worked for Python because the Runner used a
> >> ByteArrayCoder with the OUTER encoding context for the key which was
> >> basically option (2). Only problem that, for standard coders the Java
> >> SDK Harness produced non-matching state request keys, due to it using
> >> the NESTED context.)
> >>
> >> On 07.11.19 18:01, Luke Cwik wrote:
> >> >
> >> >
> >> > On Thu, Nov 7, 2019 at 8:22 AM Robert Bradshaw <rober...@google.com
> >> > <mailto:rober...@google.com>> wrote:
> >> >
> >> >     On Thu, Nov 7, 2019 at 6:26 AM Maximilian Michels <m...@apache.org
> >> >     <mailto:m...@apache.org>> wrote:
> >> >      >
> >> >      > Thanks for the feedback thus far. Some more comments:
> >> >      >
> >> >      > > Instead, the runner knows ahead of time that it
> >> >      > > will need to instantiate this coder, and should update the
> bundle
> >> >      > > processor to specify KvCoder<LengthPrefixCoder<CustomCoder>,
> >> >      > > VarIntCoder> as the coder so both can pull it out in a
> >> >     consistent way.
> >> >      >
> >> >      > By "update the bundle processor", do you mean modifying the
> >> >      > ProcessBundleDescriptor's BagUserState with the correct key
> coder?
> >> >      > Conceptually that is possible, but the current implementation
> >> >     does not
> >> >      > allow for this to happen:
> >> >      >
> >> >
> https://github.com/apache/beam/blob/076a037e53ca39b61c1dfeb580527bc8d0371dc1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java#L284
> >> >      > It enforces ByteString which does not tell the SDK Harness
> anything
> >> >      > about the desired encoding.
> >> >
> >> >     I meant update the BundleProcessDescriptor proto that is sent to
> the
> >> >     SDK
> >> >
> https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L140
> ,
> >> >     essentially option (1).
> >> >
> >> >
> >> > For clarity, the "key" coder is specified by the stateful ParDo's main
> >> > input PCollection. This means that the ProcessBundleDescriptor should
> >> > have something that has the length prefix as part of the remote grpc
> >> > port specification AND the PCollection that follows it which is the
> main
> >> > input for the stateful ParDo. This means that the wire format that the
> >> > runner sends for the "key" represents the exact same wire format it
> will
> >> > receive for state requests.
> >> >
> >> > I see what you mean Max,
> >> >
> https://github.com/apache/beam/blob/076a037e53ca39b61c1dfeb580527bc8d0371dc1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java#L284
> >> > could change to represent the actual coder (whether it be
> >> > LengthPrefixCoder<BytesCoder> or some other model coder combination).
> >> > Currently that logic assumes that the runner can perform the backwards
> >> > mapping by decoding the bytestring with the appropriate coder.
> >> >
> >> >      > Since the above does not seem feasible, I see the following
> options:
> >> >      >
> >> >      > (1) Modify the pipeline Proto before the translation and wrap a
> >> >      > LengthPrefixCoder around non-standard key coders for stateful
> >> >      > transforms. This would change the encoding for the entire
> >> >     element, to be
> >> >      > sure that the key coder for state requests contains a
> >> >     LengthPrefixCoder
> >> >      > for state requests from the SDK Harness. Not optimal.
> >> >
> >> >     Yes. The contract should be that both the runner and SDK use the
> >> >     coders that are specified in the proto. The runner controls the
> proto,
> >> >     and should ensure it only sends protos it will be able to handle
> the
> >> >     SDK responding to. I'm not seeing why this is sub-optimal.
> >> >
> >> >      > (2) Add a new method WireCoders#instantiateRunnerWireKeyCoder
> which
> >> >      > returns the correct key coder, i.e. for standard coders, the
> concrete
> >> >      > coder, and for non-standard coders a ByteArrayCoder. We also
> need to
> >> >      > ensure the key encoding on the Runner side is OUTER context,
> to avoid
> >> >      > adding a length prefix to the encoded bytes. Basically, the
> >> >     non-standard
> >> >      > coders result in a NOOP coder which does not touch the key
> bytes.
> >> >
> >> >     I'd really like to avoid implicit agreements about how the coder
> that
> >> >     should be used differs from what's specified in the proto in
> different
> >> >     contexts.
> >> >
> >> >      > (3) Patch the Python SDK to ensure non-standard state key
> coders are
> >> >      > always wrapped in a LengthPrefixCoder. That way, we can keep
> the
> >> >      > existing logic on the Runner side.
> >> >
> >> >     The key concept here is not "standard coder" but "coder that the
> >> >     runner does not understand." This knowledge is only in the runner.
> >> >     Also has the downside of (2).
> >> >
> >> >      > Option (2) seems like the most practical.
> >> >      >
> >> >      > -Max
> >> >      >
> >> >      > On 06.11.19 17:26, Robert Bradshaw wrote:
> >> >      > > On Wed, Nov 6, 2019 at 2:55 AM Maximilian Michels
> >> >     <m...@apache.org <mailto:m...@apache.org>> wrote:
> >> >      > >>
> >> >      > >> Let me try to clarify:
> >> >      > >>
> >> >      > >>> The Coder used for State/Timers in a StatefulDoFn is pulled
> >> >     out of the
> >> >      > >>> input PCollection. If a Runner needs to partition by this
> >> >     coder, it
> >> >      > >>> should ensure the coder of this PCollection matches with
> the
> >> >     Coder
> >> >      > >>> used to create the serialized bytes that are used for
> >> >     partitioning
> >> >      > >>> (whether or not this is length-prefixed).
> >> >      > >>
> >> >      > >> That is essentially what I had assumed when I wrote the
> code. The
> >> >      > >> problem is the coder can be "pulled out" in different ways.
> >> >      > >>
> >> >      > >> For example, let's say we have the following Proto
> PCollection
> >> >     coder
> >> >      > >> with non-standard coder "CustomCoder" as the key coder:
> >> >      > >>
> >> >      > >>     KvCoder<CustomCoder, VarIntCoder>
> >> >      > >>
> >> >      > >>   From the Runner side, this currently looks like the
> following:
> >> >      > >>
> >> >      > >>     PCol: KvCoder<LengthPrefixCoder<ByteArrayCoder>,
> VarIntCoder>
> >> >      > >>     Key:  LengthPrefixCoder<ByteArrayCoder>
> >> >      > >
> >> >      > > This is I think where the error is. When If the proto
> references
> >> >      > > KvCoder<CustomCoder, VarIntCoder> it should not be pulled
> out as
> >> >      > > KvCoder<LengthPrefixCoder<ByteArrayCoder>, VarIntCoder>; as
> that
> >> >      > > doesn't have the same encoding. Trying to do instantiate
> such a
> >> >     coder
> >> >      > > should be an error. Instead, the runner knows ahead of time
> that it
> >> >      > > will need to instantiate this coder, and should update the
> bundle
> >> >      > > processor to specify KvCoder<LengthPrefixCoder<CustomCoder>,
> >> >      > > VarIntCoder> as the coder so both can pull it out in a
> >> >     consistent way.
> >> >      > >
> >> >      > > When the coder is KvCoder<LengthPrefixCoder<CustomCoder>,
> >> >     VarIntCoder>
> >> >      > > instantiating it as KvCoder<ByteArrayCoder, VarIntCoder> on
> the
> >> >     runner
> >> >      > > is of course OK as they do have the same encoding.
> >> >      > >
> >> >      > >> At the SDK Harness, we have the coder available:
> >> >      > >>
> >> >      > >>     PCol: KvCoder<CustomCoder, VarIntCoder>
> >> >      > >>     Key:  CustomCoder
> >> >      > >>
> >> >      > >> Currently, when the SDK Harness serializes a key for a state
> >> >     request,
> >> >      > >> the custom coder may happen to add a length prefix, or it
> may
> >> >     not. It
> >> >      > >> depends on the coder used. The correct behavior would be to
> >> >     use the same
> >> >      > >> representation as on the Runner side.
> >> >      > >>
> >> >      > >>> Specifically, "We have no way of telling from the Runner
> >> >     side, if a length prefix has been used or not." seems false
> >> >      > >>
> >> >      > >> The Runner cannot inspect an unknown coder, it only has the
> >> >     opaque Proto
> >> >      > >> information available which does not allow introspection of
> >> >     non-standard
> >> >      > >> coders. With the current state, the Runner may think the
> coder
> >> >     adds a
> >> >      > >> length prefix but the Python SDK worker could choose to add
> >> >     none. This
> >> >      > >> produces an inconsistent key encoding. See above.
> >> >      > >
> >> >      > > I think what's being conflated here is "the Coder has been
> >> >     wrapped in
> >> >      > > a LengthPrefixCoder" vs. "the coder does length prefixing."
> >> >     These are
> >> >      > > two orthogonal concepts. The runner in general only knows the
> >> >     former.
> >> >      > >
> >> >      > >> It looks like the key encoding for state requests on the
> >> >     Python SDK
> >> >      > >> Harness side is broken. For transferring elements of a
> >> >     PCollection, the
> >> >      > >> coders are obviously working correctly, but for encoding
> >> >     solely the key
> >> >      > >> of an element, there is a consistency issue.
> >> >      > >>
> >> >      > >>
> >> >      > >> -Max
> >> >      > >>
> >> >      > >> On 06.11.19 05:35, Kenneth Knowles wrote:
> >> >      > >>> Specifically, "We have no way of telling from the Runner
> >> >     side, if a
> >> >      > >>> length prefix has been used or not." seems false. The
> runner
> >> >     has all the
> >> >      > >>> information since length prefix is a model coder. Didn't we
> >> >     agree that
> >> >      > >>> all coders should be self-delimiting in runner/SDK
> interactions,
> >> >      > >>> requiring length-prefix only when there is an opaque or
> >> >     dynamic-length
> >> >      > >>> value? I assume you mean that at runtime the worker for a
> >> >     given engine
> >> >      > >>> does not know?
> >> >      > >>>
> >> >      > >>> Kenn
> >> >      > >>>
> >> >      > >>> On Tue, Nov 5, 2019 at 3:19 PM Luke Cwik <lc...@google.com
> >> >     <mailto:lc...@google.com>
> >> >      > >>> <mailto:lc...@google.com <mailto:lc...@google.com>>>
> wrote:
> >> >      > >>>
> >> >      > >>>      +1 to what Robert said.
> >> >      > >>>
> >> >      > >>>      On Tue, Nov 5, 2019 at 2:36 PM Robert Bradshaw
> >> >     <rober...@google.com <mailto:rober...@google.com>
> >> >      > >>>      <mailto:rober...@google.com
> >> >     <mailto:rober...@google.com>>> wrote:
> >> >      > >>>
> >> >      > >>>          The Coder used for State/Timers in a StatefulDoFn
> is
> >> >     pulled out
> >> >      > >>>          of the
> >> >      > >>>          input PCollection. If a Runner needs to partition
> by
> >> >     this coder, it
> >> >      > >>>          should ensure the coder of this PCollection
> matches
> >> >     with the Coder
> >> >      > >>>          used to create the serialized bytes that are used
> >> >     for partitioning
> >> >      > >>>          (whether or not this is length-prefixed).
> >> >      > >>>
> >> >      > >>>          Concretely, the graph looks like
> >> >      > >>>
> >> >      > >>>
> >> >      > >>>          Runner                          SDK Harness
> >> >      > >>>
> >> >      > >>>          WriteToGbk
> >> >      > >>>               |
> >> >      > >>>          ReadFromGbk
> >> >      > >>>               |
> >> >      > >>>          RunnerMapFn.processKeyValue(key, value)
> >> >      > >>>               |
> >> >      > >>>               WriteToDataChannel
> >> >      > >>>                       ------------------------>
> >> >      > >>>                            ReadFromDataChannel
> >> >      > >>>                                          |
> >> >      > >>>                                      (pcIn)
> >> >      > >>>                                          |
> >> >      > >>>                             MyStatefulDoFn.process(key,
> value)
> >> >      > >>>
> >> >      > >>>          Now the (key part of the) Coder of pcIn, which
> comes
> >> >     from the proto
> >> >      > >>>          that the Runner sent to the SDK, must match the
> (key
> >> >     part of the)
> >> >      > >>>          encoding used in WriteToGbk and ReadFromGbk. If a
> >> >     LenthPrefix is
> >> >      > >>>          added
> >> >      > >>>          in one spot, it must be added in the other.
> >> >      > >>>
> >> >      > >>>
> >> >      > >>>          [1]
> >> >      > >>>
> >> >
> https://github.com/apache/beam/blob/release-2.17.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L1183
> >> >      > >>>
> >> >      > >>>          On Tue, Nov 5, 2019 at 1:25 PM Maximilian Michels
> >> >      > >>>          <m...@apache.org <mailto:m...@apache.org>
> >> >     <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
> >> >      > >>>           >
> >> >      > >>>           > Hi,
> >> >      > >>>           >
> >> >      > >>>           > I wanted to get your opinion on something that
> I
> >> >     have been
> >> >      > >>>          struggling
> >> >      > >>>           > with. It is about the coders for state requests
> >> >     in portable
> >> >      > >>>          pipelines.
> >> >      > >>>           >
> >> >      > >>>           > In contrast to "classic" Beam, the Runner is
> not
> >> >     guaranteed
> >> >      > >>>          to know
> >> >      > >>>           > which coder is used by the SDK. If the SDK
> >> >     happens to use a
> >> >      > >>>          standard
> >> >      > >>>           > coder (also known as model coder), we will also
> >> >     have it
> >> >      > >>>          available at the
> >> >      > >>>           > Runner, i.e. if the Runner is written in one of
> >> >     the SDK
> >> >      > >>>          languages (e.g.
> >> >      > >>>           > Java). However, when we do not have a standard
> >> >     coder, we just
> >> >      > >>>          treat the
> >> >      > >>>           > data from the SDK as a blob and just pass it
> >> >     around as bytes.
> >> >      > >>>           >
> >> >      > >>>           > Problem
> >> >      > >>>           > =======
> >> >      > >>>           >
> >> >      > >>>           > In the case of state requests which the SDK
> >> >     Harness authors
> >> >      > >>>          to the
> >> >      > >>>           > Runner, we would like for the key associated
> with
> >> >     the state
> >> >      > >>>          request to
> >> >      > >>>           > match the key of the element which led to
> >> >     initiating the
> >> >      > >>>          state request.
> >> >      > >>>           >
> >> >      > >>>           > Example:
> >> >      > >>>           >
> >> >      > >>>           > Runner                 SDK Harness
> >> >      > >>>           > ------                 -----------
> >> >      > >>>           >
> >> >      > >>>           > KV["key","value"]  --> Process Element
> >> >      > >>>           >                                |
> >> >      > >>>           > LookupState("key") <-- Request state of "key"
> >> >      > >>>           >          |
> >> >      > >>>           >     State["key"]    --> Receive state
> >> >      > >>>           >
> >> >      > >>>           >
> >> >      > >>>           > For stateful DoFns, the Runner partitions the
> >> >     data based on
> >> >      > >>>          the key. In
> >> >      > >>>           > Flink, this partitioning must not change during
> >> >     the lifetime of a
> >> >      > >>>           > pipeline because the checkpointing otherwise
> >> >     breaks[0]. The
> >> >      > >>>          key is
> >> >      > >>>           > extracted from the element and stored encoded.
> >> >      > >>>           >
> >> >      > >>>           > If we have a standard coder, it is basically
> the
> >> >     same as in the
> >> >      > >>>           > "classic" Runner which takes the key and
> >> >     serializes it.
> >> >      > >>>          However, when we
> >> >      > >>>           > have an SDK-specific coder, we basically do not
> >> >     know how it
> >> >      > >>>          encodes. So
> >> >      > >>>           > far, we have been using the coder instantiated
> >> >     from the
> >> >      > >>>          Proto, which is
> >> >      > >>>           > basically a LengthPrefixCoder[ByteArrayCoder]
> or
> >> >     similar[1].
> >> >      > >>>          We have had
> >> >      > >>>           > problems with this because the key encoding of
> >> >     Java SDK state
> >> >      > >>>          requests
> >> >      > >>>           > did not match the key encoding on the Runner
> side
> >> >     [2]. In an
> >> >      > >>>          attempt to
> >> >      > >>>           > fix those, it is now partly broken for portable
> >> >     Python pipelines.
> >> >      > >>>           > Partly, because it "only" affects non-standard
> >> >     coders.
> >> >      > >>>           >
> >> >      > >>>           > Non-standard coders yield the aforementioned
> >> >      > >>>           > LengthPrefixCoder[ByteArrayCoder]. Now,
> following
> >> >     the usual
> >> >      > >>>          encoding
> >> >      > >>>           > scheme, we would simply encode the key using
> this
> >> >     coder.
> >> >      > >>>          However, for
> >> >      > >>>           > state requests, the Python SDK leaves out the
> >> >     length prefix
> >> >      > >>>          for certain
> >> >      > >>>           > coders, e.g. for primitives like int or byte.
> It
> >> >     is possible
> >> >      > >>>          that one
> >> >      > >>>           > coder uses a length prefix, while another
> >> >     doesn't. We have no
> >> >      > >>>          way of
> >> >      > >>>           > telling from the Runner side, if a length
> prefix
> >> >     has been
> >> >      > >>>          used or not.
> >> >      > >>>           > This results in the keys to not match on the
> >> >     Runner side and the
> >> >      > >>>           > partitioning to be broken.
> >> >      > >>>           >
> >> >      > >>>           >
> >> >      > >>>           > How to solve this?
> >> >      > >>>           > ==================
> >> >      > >>>           >
> >> >      > >>>           > (1) Should this simply be fixed on the Python
> SDK
> >> >     side? One
> >> >      > >>>          fix would be
> >> >      > >>>           > to always append a length prefix to the key in
> state
> >> >      > >>>          requests, even for
> >> >      > >>>           > primitive coders like VarInt which do not use
> one.
> >> >      > >>>           >
> >> >      > >>>           > OR
> >> >      > >>>           >
> >> >      > >>>           > (2) Should the Runner detect that a
> non-standard
> >> >     coder is
> >> >      > >>>          used? If so,
> >> >      > >>>           > it should just pass the bytes from the SDK
> >> >     Harness and never
> >> >      > >>>          make an
> >> >      > >>>           > attempt to construct a coder based on the
> Proto.
> >> >      > >>>           >
> >> >      > >>>           >
> >> >      > >>>           > Thinking about it now, it seems pretty obvious
> >> >     that (2) is
> >> >      > >>>          the most
> >> >      > >>>           > feasible way to avoid complications across all
> >> >     current and
> >> >      > >>>          future SDKs
> >> >      > >>>           > for key encodings. Still, it is odd that the
> >> >     Proto contains coder
> >> >      > >>>           > information which is not usable.
> >> >      > >>>           >
> >> >      > >>>           > What do you think?
> >> >      > >>>           >
> >> >      > >>>           >
> >> >      > >>>           > Thanks,
> >> >      > >>>           > Max
> >> >      > >>>           >
> >> >      > >>>           >
> >> >      > >>>           > [0] It is possible to restart the pipeline and
> >> >     repartition the
> >> >      > >>>           > checkpointed data.
> >> >      > >>>           > [1]
> >> >      > >>>           >
> >> >      > >>>
> >> >
> https://github.com/apache/beam/blob/c39752af5391fe698a2b4f1489c187ddd4d604c0/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L682
> >> >      > >>>           > [2]
> https://issues.apache.org/jira/browse/BEAM-8157
> >> >      > >>>
> >> >
>

Reply via email to