Hi,
For RocksDB state backend, it will pick the registered kryo serializer for
normal read/write use and checkpint/restore. Moreover, since key-values are
serialized to store in RocksDB, it actually deep copy them to avoid later
unexpected modification.
For FileSystem/HashMap state backend,
That's what I would try out, but I'm not sure if the statebackend would
pick that up. @Yun Tang do you know more?
On Mon, Oct 18, 2021 at 9:37 AM Alex Drobinsky
wrote:
> Hi Arvid,
>
> It sounds like a good direction, do I need to register my state class with
> KryoSerializer , similar to this
Hi Arvid,
It sounds like a good direction, do I need to register my state class with
KryoSerializer , similar to this ?
env.getConfig().registerTypeWithKryoSerializer(IPSessionOrganizer.proto.SourceOutput.class,
ProtobufSerializer.class);
пн, 18 окт. 2021 г. в 10:32, Arvid Heise :
> Hi Alex,
Hi Alex,
could you also log the identifity hashcode (or something similar) of the
related instance? I suspect that it's not the field that is set to null but
that you get a clone where the field is null. In that case, you need to add
a specific KryoSerializer to initialize it (or just go with a
Hi Jing,
Job doesn't restart from the checkpoint, it's a brand new clean job , no
exceptions happened during execution, no restarts :)
The state is a Keyed State so a new key means a new State - in this
situation a currentFile is equal to null - as expected and handled without
issues.
Before I
Hi Alex,
Since you use `FileSystemStateBackend`, I think currentFile became
nullified once in a while is not caused by period checkpoint.
Because if job is running without failover or restore from checkpoint,
read/write value state on `FileSystemStateBackend` does not cause
serialization and
Hi Alex,
Since you use customized MultiStorePacketState class as the value state type,
it should use kryo serializer [1] to serialize your class via accessing RocksDB
state or checkpoint via FileSystemStateBackend, and I don't know whether Kryo
would serialize your transient field.
If you're
It would be difficult to provide even a semblance of the complete product ,
however I could try to provide enough details to reproduce the problem.
Standard source would do:
DataStream stream = env.addSource(
new FlinkKafkaConsumer<>(topic, new
AbstractDeserializationSchema() {
Hi Alex,
It is a little weird.
Would you please provide the program which could reproduce the problem,
including DataStream job code and related classes code. I need some debug
to find out the reason.
Best,
JING ZHANG
Alex Drobinsky 于2021年10月11日周一 下午5:50写道:
> Hi Jing Zhang,
>
> I'm using the
Hi Jing Zhang,
I'm using the FileSystem backend. I also implemented ReadObject function to
support proper restart procedure:
private void readObject(ObjectInputStream ois)
throws ClassNotFoundException, IOException {
ois.defaultReadObject();
logger.info("Deserialized
Hi, Alex
What state backend do you choose?
If you choose MemoryStateBackend or FsStateBackend, `transient` keyword may
not have effect because MemoryStateBackend does not serialize state for
regular read/write accesses but keeps it as objects on the heap.
If you choose RocksDBStateBackend, I
Dear flink community,
I have following state class ( irrelevant fields removed )
public class MultiStorePacketState implements Serializable {
public transient RandomAccessFile currentFile = null;
public long timerValue;
public String fileName;
public String exportedFileName;
12 matches
Mail list logo