Rocksdb Serialization issue

2020-03-04 Thread David Morin
Hello, I have this Exception in my datastream app and I can't find the root cause. I consume data from Kafka and it fails when I try to get a value from my MapState in RocksDB. It was working in previous release of my app but I can't find the cause of this error. java.lang.ArrayIndexOutOfBounds

Re: Rocksdb Serialization issue

2020-03-04 Thread Arvid Heise
Hi David, the obvious reason is that your state stored an enum value that is not present anymore. It tries to deserialize the 512. entry in your enum that is not available. However, since it's highly unlikely that you actually have that many enum values in the same enum class, we are actually loo

Re: Rocksdb Serialization issue

2020-03-05 Thread David Morin
Hello Arvid, After some investigations with the help of my colleague we finally found the root cause. In order to improve the init of the state, I've created some threads to parallelize the read of bucket files. This is a temporary solution because I've planned to use the State Processor API. Here

Re: Rocksdb Serialization issue

2020-03-05 Thread Arvid Heise
Hi David, could you please explain what you are actually trying to achieve? It seems like you are reading in the SinkFunction#open some files from S3 and put it into state (bootstrapping?) How many instances of the sink are executed? How do you shard the buckets / e.g. how do you avoid reading th

Re: Rocksdb Serialization issue

2020-03-05 Thread David Morin
Yes Arvid, the Sink is keyed by a String dbName::tableName This is kafka as input but to init the state we have to read Hive delta files febore consume kafka records. This is ORC files we have to read to init the state with one directory per table. A key (primary key) is only in one bucket file. So