Hi guys, just wanted to write about that topic on my own.
The FF talk of Tzu-Li gave me also the impression that by just using AvroSerializer, we get some kind of state evolution for free. https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink However, I discovered two issues on 1.3.2: 1. The AvroSerializer does not use read/write schema. The snapshot stores type information instead of the more plausible schema information. However, the actual type should not matter as long as a compatible type is used for state restoration. I have rewritten the AvroSerializer to store the schema in the snapshot config and actually uses it as a read schema during the initialization of the DatumReader. 2. During integration tests, it turns out that the current implementation of the StateDescriptor always returns copies of the serializer through #getSerializer. So #ensureCompatibility is invoked on a different serializer than the actual #deserialize method. So although my AvroSerializer sets the correct read schema, it is not used, since it is set on the wrong instance. I propose to make sure that #ensureCompatibility is invoked on the original serializer in the state descriptor. Otherwise all adjustments to the serializer are lost. I can provide tests and patches if needed. One related question: If I do an incremental snapshot with RocksDB backend and keyed state backend, is the snapshot config attached to all keys? So would the following work: * Write (key1, value1) and (key2, value2) with schema1. Do cancel with snapshot. * Read (key1, value1) with schema1->schema2 and write with (key1, value1). Do cancel with snapshot. <Now we have two different schemas in the snapshots> * Read (key1, value1) with schema2 and read with (key2, value2) with schema1->schema2. Thanks for any feedback Arvid On Mon, Feb 19, 2018 at 7:17 PM, Niels Denissen <nielsdenis...@gmail.com> wrote: > Hi Till, > > Thanks for the quick reply, I'm using 1.3.2 atm. > > Cheers, > Niels > > On Feb 19, 2018 19:10, "Till Rohrmann" <trohrm...@apache.org> wrote: >> >> Hi Niels, >> >> which version of Flink are you using? Currently, Flink does not support to >> upgrade the TypeSerializer itself, if I'm not mistaken. As you've described, >> it will try to use the old serializer stored in the checkpoint stream to >> restore state. >> >> I've pulled Gordon into the conversation who can tell you a little bit >> more about the current capability and limitations of state evolution. >> >> Cheers, >> Till >> >> On Mon, Feb 19, 2018 at 4:14 PM, Niels <[hidden email]> wrote: >>> >>> Hi all, >>> >>> I'm currently trying to use Avro in order to evolve our data present in >>> Flink's Managed State. I've extended the TypeSerializer class >>> successfully >>> for this purpose, but still have issues using Schema Evolution. >>> >>> *The problem:* >>> When we try to read data (deserialize from savepoint) with a new >>> serialiser >>> and a new schema, Flink seems to use the old schema of the old serializer >>> (written to the savepoint). This results in an old GenericRecord that >>> doesn't adhere to the new Avro schema. >>> >>> *What seems to happen to me is the following* (Say we evolve from dataV1 >>> to >>> dataV2): >>> - State containing dataV1 is serialized with avro schema V1 to a >>> check/savepoint. Along with the data, the serializer itself is written. >>> - Upon restore, the old serializer is retrieved from the data (therefore >>> needs to be on the classpath). Data is restored using this old >>> serializer. >>> The new serializer provided is only used for writes. >>> >>> If this is indeed the case it explains our aforementioned problem. If you >>> have any pointers as to whether this is true and what a possible solution >>> would be that would be very much appreciated! >>> >>> Thanks! >>> Niels >>> >>> >>> >>> -- >>> Sent from: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> >> >> >> >> ________________________________ >> If you reply to this email, your message will be added to the discussion >> below: >> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Managed-State-Custom-Serializer-with-Avro-tp18419p18437.html >> To unsubscribe from Managed State Custom Serializer with Avro, click here. >> NAML