I think I'm not understanding the use case here? Are we talking about encoding of data in motion (e.g. between stages of a streaming pipeline) or data at rest? (e.g. input formats and output formats) Or maybe something else?
On Wed, May 3, 2023 at 6:58 AM Jan Lukavský <je...@seznam.cz> wrote: > Hi, > > I'd like to discuss a topic, that from time to time appears in different > contexts (e.g. [1]). I'd like restate the problem in a slightly more > generic way as: "Should we have a way to completely exchange coder of a > PCollection/state of a _running_ Pipeline?". First my motivation for > this question - Beam has an extension called > beam-sdks-java-extensions-kryo, which contains a KryoCoder. This coder > uses Kryo [2] to serialize virtually any Java class into binary format. > Unfortunately, this binary representation differs between Kryo versions > and it does not contain any way to recognize which version of Kryo was > used to serialize the data. Attempt to deserialize bytes produced by > incompatible version of Kryo results in an exception. The current > version of Kryo that is used by the KryoCoder is already more than 5 > years old and upgrade to newer version is needed, because the current > version does not work with JDK17+ [3]. Thus, the only option seems to be > the creation of a different Coder (e.g. Kryo5Coder), but then we need > the ability to transfer Pipelines using the old KryoCoder to the newer > one. That is, we need to completely switch coder that encodes > PCollection and/or state. > > We have therefore the following options: > > 1) Simply ignore this and let users rerun the Pipeline from scratch. > This is possible, essentially should be applicable, but if anything > else, for some Pipelines it might be costly to reprocess all historical > data. > > 2) We can create the new Coder and let users use a runner-specific way > to convert the Pipeline. E.g. in case of Flink, this could be done by > converting savepoint into the new format. This requires knowledge of how > Beam stores state (namespaces) and is kind of involved on the user side. > We could probably provide runner-specific tools for this, but some > runners, in general, might not allow such state manipulation. > > 3) We can include the information of a Coder update into the Pipeline > and resubmit it to the runner and let the runner handle it. Upon > Pipeline restart, a runner would have to convert all state and all > inflight data from the old Coder to the new one, before resuming the > Pipeline. > > Option 3) seems like the most natural, but it requires support on the > runner side. > > I leave the details on how a runner would do this open, I'm currently > interested in knowing what is the community's position on this. > > Jan > > [1] https://lists.apache.org/thread/z2m1hg4l5k2kb7nhjkv2lnwf8g4t9wps > > [2] https://github.com/EsotericSoftware/kryo > > [3] https://github.com/EsotericSoftware/kryo/issues/885 > >