On 5/3/23 19:57, Kenneth Knowles wrote:
My big picture hot take: this is useful, but the problem we really
need to solve is topology change, which will obsolete coder evolution.
I think Beam model has a role in this. It isn't just a runner-specific
thing. We need to ensure the model makes it possible/easy to replace
one pipeline with another, and to define how that should look. For
example, a composite PTransform with the same input/output types
should have its internals replaced in some blue/green way in some
cases. This would of course includes lots of coder changes without any
notion of compatibility possible. And for a pipeline that looks pretty
much the same except for some encoding change, we should definitely
see if we can define a more localized migration process.
Yes, change of coder can be viewed as a simplistic change in topology
(if we assume that coder is a property of edge connecting two nodes). In
this sense, if we were to provide way for topology upgrade, then this
would be solved as well. On the other hand I'm a little afraid that
fully generic change in topology is unachievable, because the state
stored in the previous Pipeline might not have any relation with what
should be stored in the upgraded Pipeline. Yes, there can be (probably
many) cases when this could be possible.
I haven't thought about this in a while so I don't have technical
proposals for achieving it yet. Ultimately as a user of Beam and/or
any runner I would consider being able to run a brand new unrelated
pipeline and hot swap it for the live one to be a prerequisite to
production-readiness, but it has been a while since this was what I
did day-to-day.
Agree, the ability to do this is indeed necessary. The argument is that
this might be costly. It would help if we provided a way to bootstrap
state for streaming Pipeline via batch Pipeline (and even better, in
runner-agnostic way, so that users could use different runners for
both). But this is a completely different topic. :-)
Returning back to my original motivation - all this would be a lot of
work, so I think it is reasonable to propose a short cut - deprecate
KryoCoder and introduce Kryo5Coder (or wait for Kryo 6) as an
alternative and let users handle the transition themselves.
Jan
Kenn
On Wed, May 3, 2023 at 8:07 AM Byron Ellis via dev
<dev@beam.apache.org> wrote:
I think I'm not understanding the use case here? Are we talking
about encoding of data in motion (e.g. between stages of a
streaming pipeline) or data at rest? (e.g. input formats and
output formats) Or maybe something else?
This is both data in motion (between stages of Pipeline) and at rest,
but inside the Pipeline (state). Not input/output formats.
On Wed, May 3, 2023 at 6:58 AM Jan Lukavský <je...@seznam.cz> wrote:
Hi,
I'd like to discuss a topic, that from time to time appears in
different
contexts (e.g. [1]). I'd like restate the problem in a
slightly more
generic way as: "Should we have a way to completely exchange
coder of a
PCollection/state of a _running_ Pipeline?". First my
motivation for
this question - Beam has an extension called
beam-sdks-java-extensions-kryo, which contains a KryoCoder.
This coder
uses Kryo [2] to serialize virtually any Java class into
binary format.
Unfortunately, this binary representation differs between Kryo
versions
and it does not contain any way to recognize which version of
Kryo was
used to serialize the data. Attempt to deserialize bytes
produced by
incompatible version of Kryo results in an exception. The current
version of Kryo that is used by the KryoCoder is already more
than 5
years old and upgrade to newer version is needed, because the
current
version does not work with JDK17+ [3]. Thus, the only option
seems to be
the creation of a different Coder (e.g. Kryo5Coder), but then
we need
the ability to transfer Pipelines using the old KryoCoder to
the newer
one. That is, we need to completely switch coder that encodes
PCollection and/or state.
We have therefore the following options:
1) Simply ignore this and let users rerun the Pipeline from
scratch.
This is possible, essentially should be applicable, but if
anything
else, for some Pipelines it might be costly to reprocess all
historical
data.
2) We can create the new Coder and let users use a
runner-specific way
to convert the Pipeline. E.g. in case of Flink, this could be
done by
converting savepoint into the new format. This requires
knowledge of how
Beam stores state (namespaces) and is kind of involved on the
user side.
We could probably provide runner-specific tools for this, but
some
runners, in general, might not allow such state manipulation.
3) We can include the information of a Coder update into the
Pipeline
and resubmit it to the runner and let the runner handle it. Upon
Pipeline restart, a runner would have to convert all state and
all
inflight data from the old Coder to the new one, before
resuming the
Pipeline.
Option 3) seems like the most natural, but it requires support
on the
runner side.
I leave the details on how a runner would do this open, I'm
currently
interested in knowing what is the community's position on this.
Jan
[1]
https://lists.apache.org/thread/z2m1hg4l5k2kb7nhjkv2lnwf8g4t9wps
[2] https://github.com/EsotericSoftware/kryo
[3] https://github.com/EsotericSoftware/kryo/issues/885