Hello!

I’m trying to have a process with a cache (using guava) and following this
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/process_function.html

But when I run it I get the following exception:


com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException

Serialization trace:

localCache 
(com.criteo.internal.shaded.com.google.common.cache.LocalCache$LocalManualCache)

        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)

        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)

        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)

        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:272)

        at 
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83)

        at com.criteo.flink.CachedOutput.processElement(CachedOutput.scala:28)

        at com.criteo.flink.CachedOutput.processElement(CachedOutput.scala:16)

        at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:94)

        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)

        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)

        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)

        at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.NullPointerException

        at 
com.criteo.internal.shaded.com.google.common.cache.LocalCache.hash(LocalCache.java:1839)

        at 
com.criteo.internal.shaded.com.google.common.cache.LocalCache.put(LocalCache.java:4148)

        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:144)

        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)

        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)

        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)

Which I guess it’s a problem trying to serialize the state to update it?
Do you know how I could solve this?

Btw I’m trying to not output records already processed while using a sliding 
window.

Thanks!
Juan

Reply via email to