Hi Dhvanan,

could you provide us with a little example program to reproduce the
problem? I've pulled Gordon in who might know more about the limitations of
Avro schema evolution.

Cheers,
Till

On Wed, Apr 4, 2018 at 2:36 AM, Dhvanan Shah <dhvanan.s...@reflektion.com>
wrote:

> Hello!
>
> I have a Flink job that reads an Event Json stream and converts it to Avro
> format, and another job that reads this Event Avro stream does some
> processing and converts it to a Session Avro object.
> I have a change in schema in the Event and Session Avro object, and I want
> to restart both the jobs from Savepoint with this change.
> I am able to start the first job from savepoint that reads json data, but
> while starting the second job that reads the avro object from the
> first job (for
> which schema has changed), it is not able to resume from savepoint and runs
> into this RocksDB error -
>
> *java.lang.RuntimeException: Error while adding data to RocksDB*
> *Caused by: com.esotericsoftware.kryo.KryoException:
> java.lang.IndexOutOfBoundsException: Index: 115, Size: 3*
>
> Error:
> Root exception
> java.lang.RuntimeException: Error while adding data to RocksDB
> at
> org.apache.flink.contrib.streaming.state.RocksDBReducingState.add(
> RocksDBReducingState.java:116)
> at
> org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:409)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(
> StreamInputProcessor.java:206)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:69)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: com.esotericsoftware.kryo.KryoException:
> java.lang.IndexOutOfBoundsException: Index: 115, Size: 3
> Serialization trace:
> brand (com.rfk.dataplatform.avro.ProductData)
> event_data (com.rfk.dataplatform.avro.SessionEvent)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(
> ObjectField.java:125)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.
> read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(
> ObjectField.java:106)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.
> read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
> CollectionSerializer.java:116)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
> CollectionSerializer.java:22)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:250)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(
> PojoSerializer.java:411)
> at
> org.apache.flink.contrib.streaming.state.RocksDBReducingState.add(
> RocksDBReducingState.java:109)
> ... 6 more
> Caused by: java.lang.IndexOutOfBoundsException: Index: 115, Size: 3
> at java.util.ArrayList.rangeCheck(ArrayList.java:653)
> at java.util.ArrayList.get(ArrayList.java:429)
> at
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(
> MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(
> ObjectField.java:113)
> ... 17 more
>
>
>
> I saw a Kryo serialization error, so I tried to force Avro serialization by
> `env.getConfig().enableForceAvro()` but this too didn't work and gave the
> following error -
>
> Root exception
> java.lang.RuntimeException: Error while adding data to RocksDB
> at
> org.apache.flink.contrib.streaming.state.RocksDBReducingState.add(
> RocksDBReducingState.java:116)
> at
> org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:409)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(
> StreamInputProcessor.java:206)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:69)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ArrayIndexOutOfBoundsException
> avro_events_to_raw_user_session_transformer -> (Sink:
> raw_user_sessions_s3_sink, Flat Map -> FlinKafkaProducer 0.10.x, Flat Map
> -> FlinKafkaProducer 0.10.x,
> raw_session_to_user_session_aggregate_transformer -> FlinKafkaProducer
> 0.10.x, Flat Map -> FlinKafkaProducer 0.10.x, Session-id-information ->
> FlinKafkaProducer 0.10.x) (1/1)
> ip-10-20-9-32.ec2.internal:37587
> java.lang.RuntimeException: Error while adding data to RocksDB
> at
> org.apache.flink.contrib.streaming.state.RocksDBReducingState.add(
> RocksDBReducingState.java:116)
> at
> org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:409)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(
> StreamInputProcessor.java:206)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:69)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ArrayIndexOutOfBoundsException
>
>
> It would be really helpful if someone could help me out with this and point
> out what I might be doing wrong or how I could go about doing this as I
> feel that this is a very common situation where you want to restart some
> jobs from savepoint with schema changes. Any help would be appreciated!
>
> Cheers,
> Dhvanan Shah
>

Reply via email to