Hi Apoorv,

Flink currently does not natively support schema evolution for state types
using Scala case classes [1].

So, as Roman has pointed out, there are 2 possible ways for you to do that:
- Implementing a custom serializer that support schema evolution for your
specific Scala case classes, as Roman suggested.
- or, using the State Processor API [2] to migrate your case classes
offline as a batch job

For your question on how to implement a schema-evolution supporting
serializer, can you share with me the problems you have met so far?
Otherwise, if you take a look at the PojoSerializerSnapshot class, that
would be a starting point to implement something similar for your case
classes.

As you will quickly realize, it's not simple, so I would strongly suggest
trying out the approach of using the State Processor API.
Either way, if you bump into any problems, feel free to let me know.

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-10896
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

On Wed, Mar 18, 2020 at 1:04 PM Apoorv Upadhyay <
apoorv.upadh...@razorpay.com> wrote:

> Thanks a lot , Also can you share one example where these has been
> implemented? I have gone through docs does not happen to work still
>
> On Wed, Feb 26, 2020 at 7:59 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Apoorv,
>>
>> You can achieve this by implementing custom serializers for your state.
>> Please refer to
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/custom_serialization.html
>>
>> Regards,
>> Roman
>>
>>
>> On Wed, Feb 26, 2020 at 6:53 AM Apoorv Upadhyay <
>> apoorv.upadh...@razorpay.com> wrote:
>>
>>> Hi Roman,
>>>
>>> I have successfully migrated to flink 1.8.2 with the savepoint created
>>> by flink 1.6.2.
>>> Now I have to modify few case classes due to new requirement I have
>>> created a savepoint and when I run the app with modified class from the
>>> savepoint it throws error "state not compatible"
>>> Previously there were no serializer used.
>>> I now wish to support state schema Hence need suggestion how can i
>>> achieve that ?
>>>
>>> Regards
>>>
>>> On Tue, Feb 25, 2020 at 9:08 PM Khachatryan Roman <
>>> khachatryan.ro...@gmail.com> wrote:
>>>
>>>> Hi ApoorvK,
>>>>
>>>> I understand that you have a savepoint created by Flink 1.6.2 and you
>>>> want to use it with Flink 1.8.2. The classes themselves weren't modified.
>>>> Is that correct?
>>>> Which serializer did you use?
>>>>
>>>> Regards,
>>>> Roman
>>>>
>>>>
>>>> On Tue, Feb 25, 2020 at 8:38 AM ApoorvK <apoorv.upadh...@razorpay.com>
>>>> wrote:
>>>>
>>>>> Hi Team,
>>>>>
>>>>> Earlier we have developed on flink 1.6.2 , So there are lots of case
>>>>> classes
>>>>> which have Map,Nested case class within them for example below :
>>>>>
>>>>> case class MyCaseClass(var a: Boolean,
>>>>>                                  var b: Boolean,
>>>>>                                  var c: Boolean,
>>>>>                                  var d: NestedCaseClass,
>>>>>                                  var e:Int){
>>>>> def this(){this(false,false,new NestedCaseClass,0)}
>>>>> }
>>>>>
>>>>>
>>>>> Now we have migrated to flink 1.8.2 , I need help to figure out how
>>>>> can I
>>>>> achieve state schema evolution for such classes.
>>>>>
>>>>> 1. Is creating avro for these classes now, and implement avro
>>>>> serialisation
>>>>> will that work ?
>>>>> 2. Or if I register kyroserialiser with protobuf serialiser at env?
>>>>>
>>>>> Please suggest what can be done here, or redirect for the avros
>>>>> serialisation example.
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Sent from:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>
>>>>

Reply via email to