Hi Aljoscha,

I opened https://issues.apache.org/jira/browse/FLINK-8715 for the
RocksDB issue with pointers to the code. Let me know if you need more
details.

Best,

Arvid

On Tue, Feb 20, 2018 at 1:04 PM, Arvid Heise <arvid.he...@gmail.com> wrote:
> Hi Aljoscha, hi Till,
>
> @Aljoscha, the new AvroSerializer is almost what I wanted except that
> it does not use the schema of the snapshot while reading. In fact,
> this version will fail with the same error as before when a field is
> added or removed.
> https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L265
> needs to use the schema from
> https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L188
> as the first parameter. Accordingly, a readSchema field need to be set
> in #ensureCompatibility and relayed in #duplicate.
> Should I add a ticket for that as well?
>
> @Till concerning the poor man's migration. The doc of
> #ensureCompatibility in 1.3.2 states:
>
> <li>{@link CompatibilityResult#compatible()}: this signals Flink that
> this serializer is compatible, or
> *     has been reconfigured to be compatible, to continue reading
> previous data, and that the
> *     serialization schema remains the same. No migration needs to be
> performed.</li>
>
> The important part is the reconfiguration, which is also mentioned on
> the big documentation. The default avro and kryo serializers actually
> try to reconfigure themselves.
>
> @Aljoscha, I will open a ticket for the RocksDB thingy. I pinned the
> problem down and will try to come up with an easy solution. It's a tad
> hard to compare the different versions (since I'm deep into the
> debugger), so I just might write a 1.3.2 ticket.
>
> @Till, thanks for reminding me that we are not talking about
> incremental checkpoints ;) That makes it indeed much easier to
> understand the whole state recovery with evolution.
>
> Best,
>
> Arvid
>
> On Tue, Feb 20, 2018 at 12:27 PM, Aljoscha Krettek <aljos...@apache.org> 
> wrote:
>> Hi Arvid,
>>
>> Did you check out the most recent AvroSerializer code?
>> https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L185
>> I think this does what you're suggesting.
>>
>> Regarding the integration tests, if this is in fact the case it is not good
>> and I would be very happy about a Jira Issue/PR there.
>>
>> Regarding your last point, I think that the RockDB backend stores the
>> metadata, which includes the type serialiser snapshot once, and not for all
>> keys or key groups.
>>
>> Best,
>> Aljoscha
>>
>>
>> On 20. Feb 2018, at 11:40, Arvid Heise <arvid.he...@gmail.com> wrote:
>>
>> Hi guys,
>>
>> just wanted to write about that topic on my own.
>>
>> The FF talk of Tzu-Li gave me also the impression that by just using
>> AvroSerializer, we get some kind of state evolution for free.
>> https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink
>>
>> However, I discovered two issues on 1.3.2:
>>
>> 1. The AvroSerializer does not use read/write schema. The snapshot
>> stores type information instead of the more plausible schema
>> information.
>> However, the actual type should not matter as long as a compatible
>> type is used for state restoration.
>> I have rewritten the AvroSerializer to store the schema in the
>> snapshot config and actually uses it as a read schema during the
>> initialization of the DatumReader.
>>
>> 2. During integration tests, it turns out that the current
>> implementation of the StateDescriptor always returns copies of the
>> serializer through #getSerializer. So #ensureCompatibility is invoked
>> on a different serializer than the actual #deserialize method. So
>> although my AvroSerializer sets the correct read schema, it is not
>> used, since it is set on the wrong instance.
>> I propose to make sure that #ensureCompatibility is invoked on the
>> original serializer in the state descriptor. Otherwise all adjustments
>> to the serializer are lost.
>>
>> I can provide tests and patches if needed.
>>
>> One related question:
>>
>> If I do an incremental snapshot with RocksDB backend and keyed state
>> backend, is the snapshot config attached to all keys? So would the
>> following work:
>> * Write (key1, value1) and (key2, value2) with schema1. Do cancel with
>> snapshot.
>> * Read (key1, value1) with schema1->schema2 and write with (key1,
>> value1). Do cancel with snapshot.
>> <Now we have two different schemas in the snapshots>
>> * Read (key1, value1) with schema2 and read with (key2, value2) with
>> schema1->schema2.
>>
>> Thanks for any feedback
>>
>> Arvid
>>
>> On Mon, Feb 19, 2018 at 7:17 PM, Niels Denissen <nielsdenis...@gmail.com>
>> wrote:
>>
>> Hi Till,
>>
>> Thanks for the quick reply, I'm using 1.3.2 atm.
>>
>> Cheers,
>> Niels
>>
>> On Feb 19, 2018 19:10, "Till Rohrmann" <trohrm...@apache.org> wrote:
>>
>>
>> Hi Niels,
>>
>> which version of Flink are you using? Currently, Flink does not support to
>> upgrade the TypeSerializer itself, if I'm not mistaken. As you've described,
>> it will try to use the old serializer stored in the checkpoint stream to
>> restore state.
>>
>> I've pulled Gordon into the conversation who can tell you a little bit
>> more about the current capability and limitations of state evolution.
>>
>> Cheers,
>> Till
>>
>> On Mon, Feb 19, 2018 at 4:14 PM, Niels <[hidden email]> wrote:
>>
>>
>> Hi all,
>>
>> I'm currently trying to use Avro in order to evolve our data present in
>> Flink's Managed State. I've extended the TypeSerializer class
>> successfully
>> for this purpose, but still have issues using Schema Evolution.
>>
>> *The problem:*
>> When we try to read data (deserialize from savepoint) with a new
>> serialiser
>> and a new schema, Flink seems to use the old schema of the old serializer
>> (written to the savepoint). This results in an old GenericRecord that
>> doesn't adhere to the new Avro schema.
>>
>> *What seems to happen to me is the following* (Say we evolve from dataV1
>> to
>> dataV2):
>> - State containing dataV1 is serialized with avro schema V1 to a
>> check/savepoint. Along with the data, the serializer itself is written.
>> - Upon restore, the old serializer is retrieved from the data (therefore
>> needs to be on the classpath). Data is restored using this old
>> serializer.
>> The new serializer provided is only used for writes.
>>
>> If this is indeed the case it explains our aforementioned problem. If you
>> have any pointers as to whether this is true and what a possible solution
>> would be that would be very much appreciated!
>>
>> Thanks!
>> Niels
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>>
>>
>>
>> ________________________________
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Managed-State-Custom-Serializer-with-Avro-tp18419p18437.html
>> To unsubscribe from Managed State Custom Serializer with Avro, click here.
>> NAML
>>
>>

Reply via email to