I think I'm not understanding the use case here? Are we talking about
encoding of data in motion (e.g. between stages of a streaming pipeline) or
data at rest? (e.g. input formats and output formats) Or maybe something
else?

On Wed, May 3, 2023 at 6:58 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi,
>
> I'd like to discuss a topic, that from time to time appears in different
> contexts (e.g. [1]). I'd like restate the problem in a slightly more
> generic way as: "Should we have a way to completely exchange coder of a
> PCollection/state of a _running_ Pipeline?". First my motivation for
> this question - Beam has an extension called
> beam-sdks-java-extensions-kryo, which contains a KryoCoder. This coder
> uses Kryo [2] to serialize virtually any Java class into binary format.
> Unfortunately, this binary representation differs between Kryo versions
> and it does not contain any way to recognize which version of Kryo was
> used to serialize the data. Attempt to deserialize bytes produced by
> incompatible version of Kryo results in an exception. The current
> version of Kryo that is used by the KryoCoder is already more than 5
> years old and upgrade to newer version is needed, because the current
> version does not work with JDK17+ [3]. Thus, the only option seems to be
> the creation of a different Coder (e.g. Kryo5Coder), but then we need
> the ability to transfer Pipelines using the old KryoCoder to the newer
> one. That is, we need to completely switch coder that encodes
> PCollection and/or state.
>
> We have therefore the following options:
>
>   1) Simply ignore this and let users rerun the Pipeline from scratch.
> This is possible, essentially should be applicable, but if anything
> else, for some Pipelines it might be costly to reprocess all historical
> data.
>
>   2) We can create the new Coder and let users use a runner-specific way
> to convert the Pipeline. E.g. in case of Flink, this could be done by
> converting savepoint into the new format. This requires knowledge of how
> Beam stores state (namespaces) and is kind of involved on the user side.
> We could probably provide runner-specific tools for this, but some
> runners, in general, might not allow such state manipulation.
>
>   3) We can include the information of a Coder update into the Pipeline
> and resubmit it to the runner and let the runner handle it. Upon
> Pipeline restart, a runner would have to convert all state and all
> inflight data from the old Coder to the new one, before resuming the
> Pipeline.
>
> Option 3) seems like the most natural, but it requires support on the
> runner side.
>
> I leave the details on how a runner would do this open, I'm currently
> interested in knowing what is the community's position on this.
>
>   Jan
>
> [1] https://lists.apache.org/thread/z2m1hg4l5k2kb7nhjkv2lnwf8g4t9wps
>
> [2] https://github.com/EsotericSoftware/kryo
>
> [3] https://github.com/EsotericSoftware/kryo/issues/885
>
>

Reply via email to