Hi Dhvanan, could you provide us with a little example program to reproduce the problem? I've pulled Gordon in who might know more about the limitations of Avro schema evolution.
Cheers, Till On Wed, Apr 4, 2018 at 2:36 AM, Dhvanan Shah <dhvanan.s...@reflektion.com> wrote: > Hello! > > I have a Flink job that reads an Event Json stream and converts it to Avro > format, and another job that reads this Event Avro stream does some > processing and converts it to a Session Avro object. > I have a change in schema in the Event and Session Avro object, and I want > to restart both the jobs from Savepoint with this change. > I am able to start the first job from savepoint that reads json data, but > while starting the second job that reads the avro object from the > first job (for > which schema has changed), it is not able to resume from savepoint and runs > into this RocksDB error - > > *java.lang.RuntimeException: Error while adding data to RocksDB* > *Caused by: com.esotericsoftware.kryo.KryoException: > java.lang.IndexOutOfBoundsException: Index: 115, Size: 3* > > Error: > Root exception > java.lang.RuntimeException: Error while adding data to RocksDB > at > org.apache.flink.contrib.streaming.state.RocksDBReducingState.add( > RocksDBReducingState.java:116) > at > org.apache.flink.streaming.runtime.operators.windowing. > WindowOperator.processElement(WindowOperator.java:409) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput( > StreamInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run( > OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: com.esotericsoftware.kryo.KryoException: > java.lang.IndexOutOfBoundsException: Index: 115, Size: 3 > Serialization trace: > brand (com.rfk.dataplatform.avro.ProductData) > event_data (com.rfk.dataplatform.avro.SessionEvent) > at > com.esotericsoftware.kryo.serializers.ObjectField.read( > ObjectField.java:125) > at > com.esotericsoftware.kryo.serializers.FieldSerializer. > read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > at > com.esotericsoftware.kryo.serializers.ObjectField.read( > ObjectField.java:106) > at > com.esotericsoftware.kryo.serializers.FieldSerializer. > read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.read( > CollectionSerializer.java:116) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.read( > CollectionSerializer.java:22) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > org.apache.flink.api.java.typeutils.runtime.kryo. > KryoSerializer.deserialize(KryoSerializer.java:250) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize( > PojoSerializer.java:411) > at > org.apache.flink.contrib.streaming.state.RocksDBReducingState.add( > RocksDBReducingState.java:109) > ... 6 more > Caused by: java.lang.IndexOutOfBoundsException: Index: 115, Size: 3 > at java.util.ArrayList.rangeCheck(ArrayList.java:653) > at java.util.ArrayList.get(ArrayList.java:429) > at > com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject( > MapReferenceResolver.java:42) > at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) > at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728) > at > com.esotericsoftware.kryo.serializers.ObjectField.read( > ObjectField.java:113) > ... 17 more > > > > I saw a Kryo serialization error, so I tried to force Avro serialization by > `env.getConfig().enableForceAvro()` but this too didn't work and gave the > following error - > > Root exception > java.lang.RuntimeException: Error while adding data to RocksDB > at > org.apache.flink.contrib.streaming.state.RocksDBReducingState.add( > RocksDBReducingState.java:116) > at > org.apache.flink.streaming.runtime.operators.windowing. > WindowOperator.processElement(WindowOperator.java:409) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput( > StreamInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run( > OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ArrayIndexOutOfBoundsException > avro_events_to_raw_user_session_transformer -> (Sink: > raw_user_sessions_s3_sink, Flat Map -> FlinKafkaProducer 0.10.x, Flat Map > -> FlinKafkaProducer 0.10.x, > raw_session_to_user_session_aggregate_transformer -> FlinKafkaProducer > 0.10.x, Flat Map -> FlinKafkaProducer 0.10.x, Session-id-information -> > FlinKafkaProducer 0.10.x) (1/1) > ip-10-20-9-32.ec2.internal:37587 > java.lang.RuntimeException: Error while adding data to RocksDB > at > org.apache.flink.contrib.streaming.state.RocksDBReducingState.add( > RocksDBReducingState.java:116) > at > org.apache.flink.streaming.runtime.operators.windowing. > WindowOperator.processElement(WindowOperator.java:409) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput( > StreamInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run( > OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ArrayIndexOutOfBoundsException > > > It would be really helpful if someone could help me out with this and point > out what I might be doing wrong or how I could go about doing this as I > feel that this is a very common situation where you want to restart some > jobs from savepoint with schema changes. Any help would be appreciated! > > Cheers, > Dhvanan Shah >