Hi,
IIUC, Conditions to reproduce it are:
1. Using RocksDBStateBackend with incremental strategy
2. Using ListState in the stateful operator
3. enabling TTL with cleanupInRocksdbCompactFilter
4. adding a field to make the job trigger schema evolution
Then the exception will be thrown, right?

As for the next question about why the value is not updated in RocksDB,
Where you added the debugging log ? In RocksDBListState#get() ?

Best,
Hangxiang.

On Wed, Aug 3, 2022 at 5:32 PM tao xiao <xiaotao...@gmail.com> wrote:

> Hi team,
>
> I encountered below exception after I added a new field to a POJO used in
> list state and resumed the job from checkpoint
>
>
>
>> [error occurred during error reporting , id
>> 0xb]\n","stream":"stdout","time":
>> \n","stream":"stdout","time":
>> #\n","stream":"stdout","time":
>> #   http://bugreport.java.com/bugreport/crash.jsp\n
>> ","stream":"stdout","time":
>> # If you would like to submit a bug report, please
>> visit:\n","stream":"stdout","time":
>> #\n","stream":"stdout","time":
>> # /opt/flink/hs_err_pid1.log\n","stream":"stdout","time":
>> # An error report file with more information is saved
>> as:\n","stream":"stdout","time":
>> #\n","stream":"stdout","time":
>> # Core dump written. Default location: /opt/flink/core or
>> core.1\n","stream":"stdout","time":
>> #\n","stream":"stdout","time":
>> # V  [libjvm.so+0x57595f]  Exceptions::_throw(Thread*, char const*, int,
>> Handle, char const*)+0x1ef\n","stream":"stdout","time":\# Problematic
>> frame:\n","stream":"stdout","time":
>> # Java VM: OpenJDK 64-Bit Server VM (25.302-b08 mixed mode linux-amd64
>> compressed oops)\n","stream":"stdout","time":
>> # JRE version: OpenJDK Runtime Environment (8.0_302-b08) (build
>> 1.8.0_302-b08)\n","stream":"stdout","time":
>> #\n","stream":"stdout","time":
>> #  SIGSEGV (0xb) at pc=0x00007f6a2ad5d95f, pid=1,
>> tid=0x00007f69bac9f700\n","stream":"stdout","time":
>> #\n","stream":"stdout","time":
>> # A fatal error has been detected by the Java Runtime
>> Environment:\n","stream":"stdout","time":
>> #\n","stream":"stdout","time":]
>>
>>
>> org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:193)
>>
>> org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextElementLastAccessTimestamp(RocksDbTtlCompactFiltersManager.java:243)
>>
>> org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156)
>> at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:412)
>> at java.lang.reflect.Field.set(Field.java:764)
>> at
>> sun.reflect.UnsafeIntegerFieldAccessorImpl.set(UnsafeIntegerFieldAccessorImpl.java:80)
>> at
>> sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:171)
>> at
>> sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:167)
>> Caused by: java.lang.IllegalArgumentException: Can not set int field
>> xxxxx.hr to null value
>>
>> org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:214)
>> Exception in thread \"Thread-23\"
>> org.apache.flink.util.FlinkRuntimeException: Failed to deserialize list
>> element for TTL compaction filter
>>
>
> I verified that Flink recognized the state change
>
> Performing state migration for state ListStateDescriptor{name=novimp,
>> defaultValue=null,
>> serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6fe249e5}
>> because the state serializer's schema, i.e. serialization format, has
>> changed.
>>
>
> and successfully migrated the state with new POJO class (I updated Flink
> source code with my own debugging log)
>
> before migration object size *480* :after object size :*481*
>
>
> However when the exception occurred the object read from state has byte
> length of original size not updated size. It appears that the state
> migration during the state recovery phase didn't take effect or persist to
> RocksDB. Can you pls give me some pointers to debug this problem further?
>
>> object size *480*
>>
>
>
> Flink version is 1.13.2
> RocksDB statebackend with incremental, aligned checkpoint
> List state with TTL (24H) enabled
>
> --
> Regards,
> Tao
>

Reply via email to