Hi Apoorv, Flink currently does not natively support schema evolution for state types using Scala case classes [1].
So, as Roman has pointed out, there are 2 possible ways for you to do that: - Implementing a custom serializer that support schema evolution for your specific Scala case classes, as Roman suggested. - or, using the State Processor API [2] to migrate your case classes offline as a batch job For your question on how to implement a schema-evolution supporting serializer, can you share with me the problems you have met so far? Otherwise, if you take a look at the PojoSerializerSnapshot class, that would be a starting point to implement something similar for your case classes. As you will quickly realize, it's not simple, so I would strongly suggest trying out the approach of using the State Processor API. Either way, if you bump into any problems, feel free to let me know. Cheers, Gordon [1] https://issues.apache.org/jira/browse/FLINK-10896 [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html On Wed, Mar 18, 2020 at 1:04 PM Apoorv Upadhyay < apoorv.upadh...@razorpay.com> wrote: > Thanks a lot , Also can you share one example where these has been > implemented? I have gone through docs does not happen to work still > > On Wed, Feb 26, 2020 at 7:59 PM Khachatryan Roman < > khachatryan.ro...@gmail.com> wrote: > >> Hi Apoorv, >> >> You can achieve this by implementing custom serializers for your state. >> Please refer to >> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/custom_serialization.html >> >> Regards, >> Roman >> >> >> On Wed, Feb 26, 2020 at 6:53 AM Apoorv Upadhyay < >> apoorv.upadh...@razorpay.com> wrote: >> >>> Hi Roman, >>> >>> I have successfully migrated to flink 1.8.2 with the savepoint created >>> by flink 1.6.2. >>> Now I have to modify few case classes due to new requirement I have >>> created a savepoint and when I run the app with modified class from the >>> savepoint it throws error "state not compatible" >>> Previously there were no serializer used. >>> I now wish to support state schema Hence need suggestion how can i >>> achieve that ? >>> >>> Regards >>> >>> On Tue, Feb 25, 2020 at 9:08 PM Khachatryan Roman < >>> khachatryan.ro...@gmail.com> wrote: >>> >>>> Hi ApoorvK, >>>> >>>> I understand that you have a savepoint created by Flink 1.6.2 and you >>>> want to use it with Flink 1.8.2. The classes themselves weren't modified. >>>> Is that correct? >>>> Which serializer did you use? >>>> >>>> Regards, >>>> Roman >>>> >>>> >>>> On Tue, Feb 25, 2020 at 8:38 AM ApoorvK <apoorv.upadh...@razorpay.com> >>>> wrote: >>>> >>>>> Hi Team, >>>>> >>>>> Earlier we have developed on flink 1.6.2 , So there are lots of case >>>>> classes >>>>> which have Map,Nested case class within them for example below : >>>>> >>>>> case class MyCaseClass(var a: Boolean, >>>>> var b: Boolean, >>>>> var c: Boolean, >>>>> var d: NestedCaseClass, >>>>> var e:Int){ >>>>> def this(){this(false,false,new NestedCaseClass,0)} >>>>> } >>>>> >>>>> >>>>> Now we have migrated to flink 1.8.2 , I need help to figure out how >>>>> can I >>>>> achieve state schema evolution for such classes. >>>>> >>>>> 1. Is creating avro for these classes now, and implement avro >>>>> serialisation >>>>> will that work ? >>>>> 2. Or if I register kyroserialiser with protobuf serialiser at env? >>>>> >>>>> Please suggest what can be done here, or redirect for the avros >>>>> serialisation example. >>>>> >>>>> Thanks >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Sent from: >>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>>> >>>>