Hi Arvid,

It sounds like a good direction, do I need to register my state class with
KryoSerializer , similar to this ?

env.getConfig().registerTypeWithKryoSerializer(IPSessionOrganizer.proto.SourceOutput.class,
ProtobufSerializer.class);



пн, 18 окт. 2021 г. в 10:32, Arvid Heise <ar...@apache.org>:

> Hi Alex,
>
> could you also log the identifity hashcode (or something similar) of the
> related instance? I suspect that it's not the field that is set to null but
> that you get a clone where the field is null. In that case, you need to add
> a specific KryoSerializer to initialize it (or just go with a lazy access
> pattern all the way).
>
> On Tue, Oct 12, 2021 at 2:55 PM Alex Drobinsky <alex.drobin...@gmail.com>
> wrote:
>
>> Hi Jing,
>>
>> Job doesn't restart from the checkpoint, it's a brand new clean job , no
>> exceptions happened during execution, no restarts :)
>> The state is a Keyed State so a new key means a new State - in this
>> situation a currentFile is equal to null - as expected and handled without
>> issues.
>> Before I even thought to inquire about my questions, the first thing I
>> did - I added log messages with the value of currentFile in any place it
>> could be changed.
>> So I checked that before I release my control to Flink, currentFile has
>> the correct value and after I receive value from state in the next
>> iteration it's set to null.
>> The checkpoints by themselves could be irrelevant to the problem, the
>> only indication of connection is my assumption based on observation that
>> the interval between first event and first occurrence of nullification is
>> exactly the same as the checkpoint interval.
>>
>> Yun Tang - you are correct, it's a KryoSerializer, if I remove the
>> "transient" qualifier from currentFile, it crashes inside of KryoSerializer
>> because RandomAccessFile isn't serializable.
>> Which also points to the fact that at least once serialization was
>> actually executed. I will try an alternative approach - I will add my own
>> writeObject implementation, it should work :)
>>
>> Best regards,
>> Alex
>>
>>
>>
>>
>>
>>
>> вт, 12 окт. 2021 г. в 15:07, JING ZHANG <beyond1...@gmail.com>:
>>
>>> Hi Alex,
>>> Since you use `FileSystemStateBackend`, I think currentFile became
>>> nullified once in a while is not caused by period checkpoint.
>>>
>>> Because if job is running without failover or restore from checkpoint,
>>> read/write value state on `FileSystemStateBackend` does not cause
>>> serialization and deserialization at all. I have already simplify your
>>> coding and verify this point. If you are interested, you can find this
>>> simplified code in the attachment of the email.
>>>
>>> There are some possible reasons come to my mind, I hope this helps.
>>> 1. Does job restore from checkpoint/savepoint? This may caused by
>>> failover or user trigger stop-with-savepoint.
>>> 2. If job does not restore from checkpoint or savepoint.
>>>      2.1 After read the MultiStorePacketState from ValueState, is there
>>> somewhere in your program to update the currentFile field to null again?
>>> Because the state stored in heap,  it may be changed if program updates its
>>> value somewhere.
>>>      2.2 When the currentFile became null, is there any possible that
>>> current key never appear before? that is it's the first time that the
>>> current key appears, so get state would return default value(a new
>>> MultiStorePacketState instance with null currentFile)
>>>
>>> Best,
>>> JING ZHANG
>>>
>>> Yun Tang <myas...@live.com> 于2021年10月12日周二 下午4:41写道:
>>>
>>>> Hi Alex,
>>>>
>>>> Since you use customized MultiStorePacketState class as the value state
>>>> type, it should use kryo serializer [1] to serialize your class via
>>>> accessing RocksDB state or checkpoint via FileSystemStateBackend, and I
>>>> don't know whether Kryo would serialize your transient field.
>>>> If you're not familiar with Flink's serialization stack, I think you
>>>> could check behaviors below:
>>>>
>>>>    1. Without any checkpoint restore, use FileSystemStateBackend to
>>>>    see whether the transient field could be read as expected, the answer
>>>>    should be yes.
>>>>    2. After restoring from checkpoint, check whether could read the
>>>>    transient field back if using FileSystemStateBackend.
>>>>
>>>>
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/serialization/types_serialization/#flinks-typeinformation-class
>>>>
>>>> Best
>>>> Yun Tang
>>>>
>>>>
>>>> ------------------------------
>>>> *From:* Alex Drobinsky <alex.drobin...@gmail.com>
>>>> *Sent:* Monday, October 11, 2021 22:37
>>>> *To:* JING ZHANG <beyond1...@gmail.com>
>>>> *Cc:* User-Flink <user@flink.apache.org>
>>>> *Subject:* Re: Reset of transient variables in state to default values.
>>>>
>>>> It would be difficult to provide even a semblance of the complete
>>>> product , however I could try to provide enough details to reproduce the
>>>> problem.
>>>> Standard source would do:
>>>>
>>>> DataStream<byte[]> stream = env.addSource(
>>>>         new FlinkKafkaConsumer<>(topic, new 
>>>> AbstractDeserializationSchema<byte[]>() {
>>>>             @Override
>>>>             public byte[] deserialize(byte[] bytes) throws IOException {
>>>>                 return bytes;
>>>>             }
>>>>         }, properties)).name(topic);
>>>>
>>>>
>>>> The operator body something like:
>>>>
>>>> public class MultiStorePacketFunction extends KeyedProcessFunction<String, 
>>>> SplitterToMultiStore, ClassifierOutput> implements Serializable {
>>>>    private transient ValueState<MultiStorePacketState> state;
>>>>
>>>>    @Override
>>>>    public void processElement(SplitterToMultiStore packet, Context ctx, 
>>>> Collector<ClassifierOutput> out) throws Exception {
>>>>       if (packet.hasPackets()) {
>>>>          storedPackets.inc(packet.getPackets().getPacketsCount());
>>>>       }
>>>>
>>>>       MultiStorePacketState so = state.value();
>>>>       if (process(packet, out, so, ctx)) {
>>>>          state.update(null);
>>>>          state.clear();
>>>>       } else {
>>>>          state.update(so);
>>>>       }
>>>>    }
>>>>
>>>>     public String generateNextFilename(String sessionKey, int partNumber) {
>>>>       String path = DirectoryService.getInstance().bookDirectory();
>>>>       return path + File.separator + sessionKey + "-" + partNumber + 
>>>> ".pcap";
>>>>    }
>>>>
>>>>    private void storeContent(Collector<ClassifierOutput> collector, 
>>>> MultiStorePacketState state, SplitterToMultiStore packets) throws 
>>>> Exception {
>>>>       assert (packets != null);
>>>>       assert (packets.hasPackets());
>>>>
>>>>       if ( state.currentFile == null) {
>>>>          openFile(collector, state, packets);
>>>>       }
>>>>
>>>>       Utils.continueWriteToPcap(state.currentFile, 
>>>> packets.getPackets().getPacketsList());
>>>>       state.fileOffset = state.currentFile.length();
>>>>
>>>>       tryToCloseFile(collector, state);
>>>>    }
>>>>
>>>>    static public String extractExportedFileName(String fileName) {
>>>>       String path[] = fileName.split("/");
>>>>       return path[path.length - 2] + "/" + path[path.length - 1];
>>>>    }
>>>>
>>>>    private void openFile(Collector<ClassifierOutput> collector, 
>>>> MultiStorePacketState state, SplitterToMultiStore packets) throws 
>>>> Exception {
>>>>       state.fileIsOpened = true;
>>>>       state.fileName = generateNextFilename(state.sessionKey, 
>>>> state.partNumber);
>>>>       state.exportedFileName = extractExportedFileName(state.fileName);
>>>>
>>>> // -> Here RandomAccessFile created
>>>>       state.currentFile = Utils.startWriteToPcap(state.fileName, 
>>>> packets.getPackets().getPacketsList());
>>>>       state.fileOffset = state.currentFile.length();
>>>>       state.partNumber++;
>>>>    }
>>>>
>>>>    private void tryToCloseFile(Collector<ClassifierOutput> collector, 
>>>> MultiStorePacketState state) throws IOException {
>>>>       if (state.currentFile.length() < 
>>>> StorePacketConfigurationParameters.partSizeLimit) {
>>>>          return;
>>>>       }
>>>>       closeFile(collector, state);
>>>>    }
>>>>
>>>>    private void closeFile(Collector<ClassifierOutput> collector, 
>>>> MultiStorePacketState state) throws IOException {
>>>>       state.currentFile.close();
>>>>       state.currentFile = null;
>>>>       state.fileIsOpened = false;
>>>>       ClassifierOutput.Builder outputBuilder = 
>>>> ClassifierOutput.newBuilder();
>>>>       outputBuilder.getUsualBuilder().setFileName(state.exportedFileName);
>>>>       outputBuilder.setSessionType(SessionType.Multi);
>>>>       outputBuilder.setSessionKey(state.sessionKey);
>>>>       var classifierOutput = outputBuilder.build();
>>>>       state.sessionMetadata.add(classifierOutput);
>>>>       collector.collect(classifierOutput);
>>>>    }
>>>>
>>>>     public boolean process(SplitterToMultiStore packet, 
>>>> Collector<ClassifierOutput> collector, MultiStorePacketState so, Context 
>>>> context) throws Exception {
>>>>
>>>>       // First message
>>>>       if (packet.hasClassificationResult()) {
>>>>          sendClassificationResult(packet, collector, so);
>>>>          return false;
>>>>       }
>>>>
>>>>       // Last message
>>>>       if (packet.hasSessionClosure()) {
>>>>          if (so.isCoverageIncorrect) {
>>>>             return true;
>>>>          }
>>>>          handleSessionClosure(packet, collector, so, context);
>>>>          return true;
>>>>       }
>>>>
>>>>       if (so.isCoverageIncorrect) {
>>>>          return false;
>>>>       }
>>>>       storeContent(collector, so, packet);
>>>>
>>>>       // File could be already close e.g. it reached expected size.
>>>>       if (so.currentFile != null) {
>>>>          setupTimer(so, context.timerService());
>>>>       }
>>>>       return false;
>>>>    }
>>>>
>>>>    private void handleSessionClosure(SplitterToMultiStore packet, 
>>>> Collector<ClassifierOutput> collector, MultiStorePacketState so, Context 
>>>> context) throws IOException {
>>>>       if (so.currentFile != null) {
>>>>          closeFile(collector, so);
>>>>       }
>>>>       ClassifierOutput.Builder outputBuilder = 
>>>> ClassifierOutput.newBuilder();
>>>>       outputBuilder.setSessionKey(packet.getSessionKey());
>>>>       outputBuilder.setSessionType(packet.getSessionType());
>>>>       var messageBuilder = outputBuilder.getLastBuilder();
>>>>       messageBuilder.addAllAggregatedSession(so.sessionMetadata);
>>>>       outputBuilder.setLast(messageBuilder.build());
>>>>       var output = outputBuilder.build();
>>>>       collector.collect(output);
>>>>       context.timerService().deleteProcessingTimeTimer(so.timerValue);
>>>>       state.clear();
>>>>    }
>>>>
>>>>    private void sendClassificationResult(SplitterToMultiStore packet, 
>>>> Collector<ClassifierOutput> collector, MultiStorePacketState so) {
>>>>       var coverageResult = 
>>>> CoverageChecker.obtainCoverageInfo(packet.getClassificationResult().getLIID());
>>>>       so.isCoverageIncorrect = !coverageResult.getValue0();
>>>>       if (so.isCoverageIncorrect) {
>>>>          return;
>>>>       }
>>>>
>>>>       ClassifierOutput.Builder outputBuilder = 
>>>> ClassifierOutput.newBuilder();
>>>>       
>>>> outputBuilder.getFirstBuilder().setClassificationResult(packet.getClassificationResult().getClassificationResult());
>>>>       
>>>> outputBuilder.getFirstBuilder().setLIID(packet.getClassificationResult().getLIID());
>>>>       
>>>> outputBuilder.getFirstBuilder().setCIN(packet.getClassificationResult().getCIN());
>>>>       
>>>> outputBuilder.getFirstBuilder().setOperatorId(packet.getClassificationResult().getOperatorId());
>>>>       
>>>> outputBuilder.getFirstBuilder().setCoverage(coverageResult.getValue1());
>>>>       outputBuilder.setSessionKey(packet.getSessionKey());
>>>>       outputBuilder.setSessionType(packet.getSessionType());
>>>>       so.sessionKey = packet.getSessionKey();
>>>>       var classifierOutput = outputBuilder.build();
>>>>       so.sessionMetadata.add(classifierOutput);
>>>>       collector.collect(classifierOutput);
>>>>    }
>>>>
>>>>     @Override
>>>>    public void open(Configuration config) {
>>>>       ValueStateDescriptor<MultiStorePacketState> descriptor =
>>>>             new ValueStateDescriptor<MultiStorePacketState>(
>>>>                   "MultiStorePacketState", // the state name
>>>>                   TypeInformation.of(new TypeHint<MultiStorePacketState>() 
>>>> {}), // type information
>>>>                   new MultiStorePacketState()); // default value of the 
>>>> state, if nothing was set
>>>>       state = getRuntimeContext().getState(descriptor);
>>>>       StorePacketConfigurationParameters.init();
>>>>    }
>>>>
>>>>    @Override
>>>>    public void onTimer(long timestamp, OnTimerContext ctx, 
>>>> Collector<ClassifierOutput> collector) throws Exception {
>>>>       MultiStorePacketState so = state.value();
>>>>       if (so.currentFile != null) {
>>>>          closeFile(collector, so);
>>>>       }
>>>>
>>>>       ctx.timerService().deleteProcessingTimeTimer(so.timerValue);
>>>>       state.update(so);
>>>>    }
>>>>
>>>>    private void setupTimer(MultiStorePacketState so, TimerService 
>>>> timerService) {
>>>>       // Cancel previous timer
>>>>       timerService.deleteProcessingTimeTimer(so.timerValue);
>>>>       // Register new timer
>>>>       so.timerValue = (timerService.currentProcessingTime() + 
>>>> StorePacketConfigurationParameters.partAggregationTimeout)/1000*1000;
>>>>       timerService.registerProcessingTimeTimer(so.timerValue);
>>>>    }
>>>> }
>>>>
>>>>
>>>>
>>>> The state class looks like following:
>>>>
>>>> public class MultiStorePacketState implements Serializable {
>>>>     private /*static*/ Logger logger = 
>>>> LoggerFactory.getLogger(MultiStorePacketState.class);
>>>>     public transient RandomAccessFile currentFile = null;
>>>>     public long timerValue;
>>>>     public String fileName;
>>>>     public String exportedFileName;
>>>>     public String sessionKey;
>>>>     public long fileOffset = 0;
>>>>     public int partNumber = 0;
>>>>     public boolean isCoverageIncorrect = false;
>>>>     public boolean fileIsOpened = false;
>>>>     public ArrayList<ClassifierOutput> sessionMetadata = new ArrayList<>();
>>>>
>>>>     private void readObject(ObjectInputStream ois)
>>>>             throws ClassNotFoundException, IOException {
>>>>         ois.defaultReadObject();
>>>>         logger.info("Deserialized MultiStorePacketState: " + 
>>>> this.toString());
>>>>
>>>>         // No need to do anything in case of empty file
>>>>         if (fileName.isEmpty()) {
>>>>             return;
>>>>         }
>>>>         currentFile = new RandomAccessFile(fileName,"rw");
>>>>         currentFile.seek(fileOffset);
>>>>     }
>>>> }
>>>>
>>>> Input & output are Proto files , you could replace input with byte[] ,
>>>> and remove output generation and calls to collector.
>>>> The test should generate & store data for a while, so at least once
>>>> checkpoint would be triggered. I used a checkpoint interval 5000ms on a
>>>> quite slow system.
>>>> Every data chunk is about 1k.
>>>> Utils.startWriteToPcap - new RandomAccessFile()
>>>> Utils.writeToPcap - should be replaced with currentFile.write()
>>>>
>>>> пн, 11 окт. 2021 г. в 16:50, JING ZHANG <beyond1...@gmail.com>:
>>>>
>>>> Hi Alex,
>>>> It is a little weird.
>>>> Would you please provide the program which could reproduce the problem,
>>>> including DataStream job code and related classes code. I need some debug
>>>> to find out the reason.
>>>>
>>>> Best,
>>>> JING ZHANG
>>>>
>>>>
>>>> Alex Drobinsky <alex.drobin...@gmail.com> 于2021年10月11日周一 下午5:50写道:
>>>>
>>>> Hi Jing Zhang,
>>>>
>>>> I'm using the FileSystem backend. I also implemented ReadObject
>>>> function to support proper restart procedure:
>>>>
>>>> private void readObject(ObjectInputStream ois)
>>>>         throws ClassNotFoundException, IOException {
>>>>     ois.defaultReadObject();
>>>>     logger.info("Deserialized MultiStorePacketState: " + this.toString());
>>>>
>>>>     // No need to do anything in case of empty file
>>>>     if (fileName.isEmpty()) {
>>>>         return;
>>>>     }
>>>>     currentFile = new RandomAccessFile(fileName,"rw");
>>>>     currentFile.seek(fileOffset);
>>>> }
>>>>
>>>> However, according to logs this function wasn't called.
>>>>
>>>> Btw, it could be beneficial to add this kind of State object e.g. 
>>>> FileState which will encapsulate serialization / deserialization for 
>>>> RandomAccessFile although the concept itself is a bit contradictory to 
>>>> regular state.
>>>>
>>>> Currently, I implemented and tested a workaround via addition of the 
>>>> boolean variable isFileOpened, however it's awkward because I need to 
>>>> check the state of the transient variable every time I use state.value().
>>>>
>>>> So should it be expected that transient variables in state would be 
>>>> resetted to default values ?
>>>>
>>>>
>>>> пн, 11 окт. 2021 г. в 12:33, JING ZHANG <beyond1...@gmail.com>:
>>>>
>>>> Hi, Alex
>>>> What state backend do you choose?
>>>> If you choose MemoryStateBackend or FsStateBackend, `transient` keyword
>>>> may not have effect because MemoryStateBackend does not serialize state for
>>>> regular read/write accesses but keeps it as objects on the heap.
>>>> If you choose RocksDBStateBackend, I thought it was expected behavior
>>>> because RocksDBStateBackend stores all state as byte arrays in embedded
>>>> RocksDB instances. Therefore, it de/serializes the state of a key for every
>>>> read/write access.  CurrentFile is null because the transient variable
>>>> would not be serialized by default.
>>>>
>>>> Best,
>>>> JING ZHANG
>>>>
>>>>
>>>> Alex Drobinsky <alex.drobin...@gmail.com> 于2021年10月11日周一 下午4:33写道:
>>>>
>>>> Dear flink community,
>>>>
>>>> I have following state class ( irrelevant fields removed )
>>>> public class MultiStorePacketState implements Serializable {
>>>>
>>>>     public transient RandomAccessFile currentFile = null;
>>>>     public long timerValue;
>>>>     public String fileName;
>>>>     public String exportedFileName;
>>>>     public String sessionKey;
>>>>     public long fileOffset = 0;
>>>>
>>>> }
>>>>
>>>> Once in a while, currentFile became *nullified, *this happens after I
>>>> extract state via
>>>>
>>>> MultiStorePacketState so = state.value();
>>>>
>>>> The frequency of this behaviour is similar to checkpoint interval ( 
>>>> checkpoint interval defined as 5 seconds and first occurence of this 
>>>> problem is also 5 seconds), otherwise I don't have any clues to a possible 
>>>> explanation.
>>>>
>>>> Is it an expected side effect of checkpoint procedure ?
>>>>
>>>> Best regards,
>>>> Alex
>>>>
>>>>

Reply via email to