Hi Alex,

could you also log the identifity hashcode (or something similar) of the
related instance? I suspect that it's not the field that is set to null but
that you get a clone where the field is null. In that case, you need to add
a specific KryoSerializer to initialize it (or just go with a lazy access
pattern all the way).

On Tue, Oct 12, 2021 at 2:55 PM Alex Drobinsky <alex.drobin...@gmail.com>
wrote:

> Hi Jing,
>
> Job doesn't restart from the checkpoint, it's a brand new clean job , no
> exceptions happened during execution, no restarts :)
> The state is a Keyed State so a new key means a new State - in this
> situation a currentFile is equal to null - as expected and handled without
> issues.
> Before I even thought to inquire about my questions, the first thing I did
> - I added log messages with the value of currentFile in any place it could
> be changed.
> So I checked that before I release my control to Flink, currentFile has
> the correct value and after I receive value from state in the next
> iteration it's set to null.
> The checkpoints by themselves could be irrelevant to the problem, the only
> indication of connection is my assumption based on observation that the
> interval between first event and first occurrence of nullification is
> exactly the same as the checkpoint interval.
>
> Yun Tang - you are correct, it's a KryoSerializer, if I remove the
> "transient" qualifier from currentFile, it crashes inside of KryoSerializer
> because RandomAccessFile isn't serializable.
> Which also points to the fact that at least once serialization was
> actually executed. I will try an alternative approach - I will add my own
> writeObject implementation, it should work :)
>
> Best regards,
> Alex
>
>
>
>
>
>
> вт, 12 окт. 2021 г. в 15:07, JING ZHANG <beyond1...@gmail.com>:
>
>> Hi Alex,
>> Since you use `FileSystemStateBackend`, I think currentFile became
>> nullified once in a while is not caused by period checkpoint.
>>
>> Because if job is running without failover or restore from checkpoint,
>> read/write value state on `FileSystemStateBackend` does not cause
>> serialization and deserialization at all. I have already simplify your
>> coding and verify this point. If you are interested, you can find this
>> simplified code in the attachment of the email.
>>
>> There are some possible reasons come to my mind, I hope this helps.
>> 1. Does job restore from checkpoint/savepoint? This may caused by
>> failover or user trigger stop-with-savepoint.
>> 2. If job does not restore from checkpoint or savepoint.
>>      2.1 After read the MultiStorePacketState from ValueState, is there
>> somewhere in your program to update the currentFile field to null again?
>> Because the state stored in heap,  it may be changed if program updates its
>> value somewhere.
>>      2.2 When the currentFile became null, is there any possible that
>> current key never appear before? that is it's the first time that the
>> current key appears, so get state would return default value(a new
>> MultiStorePacketState instance with null currentFile)
>>
>> Best,
>> JING ZHANG
>>
>> Yun Tang <myas...@live.com> 于2021年10月12日周二 下午4:41写道:
>>
>>> Hi Alex,
>>>
>>> Since you use customized MultiStorePacketState class as the value state
>>> type, it should use kryo serializer [1] to serialize your class via
>>> accessing RocksDB state or checkpoint via FileSystemStateBackend, and I
>>> don't know whether Kryo would serialize your transient field.
>>> If you're not familiar with Flink's serialization stack, I think you
>>> could check behaviors below:
>>>
>>>    1. Without any checkpoint restore, use FileSystemStateBackend to see
>>>    whether the transient field could be read as expected, the answer should 
>>> be
>>>    yes.
>>>    2. After restoring from checkpoint, check whether could read the
>>>    transient field back if using FileSystemStateBackend.
>>>
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/serialization/types_serialization/#flinks-typeinformation-class
>>>
>>> Best
>>> Yun Tang
>>>
>>>
>>> ------------------------------
>>> *From:* Alex Drobinsky <alex.drobin...@gmail.com>
>>> *Sent:* Monday, October 11, 2021 22:37
>>> *To:* JING ZHANG <beyond1...@gmail.com>
>>> *Cc:* User-Flink <user@flink.apache.org>
>>> *Subject:* Re: Reset of transient variables in state to default values.
>>>
>>> It would be difficult to provide even a semblance of the complete
>>> product , however I could try to provide enough details to reproduce the
>>> problem.
>>> Standard source would do:
>>>
>>> DataStream<byte[]> stream = env.addSource(
>>>         new FlinkKafkaConsumer<>(topic, new 
>>> AbstractDeserializationSchema<byte[]>() {
>>>             @Override
>>>             public byte[] deserialize(byte[] bytes) throws IOException {
>>>                 return bytes;
>>>             }
>>>         }, properties)).name(topic);
>>>
>>>
>>> The operator body something like:
>>>
>>> public class MultiStorePacketFunction extends KeyedProcessFunction<String, 
>>> SplitterToMultiStore, ClassifierOutput> implements Serializable {
>>>    private transient ValueState<MultiStorePacketState> state;
>>>
>>>    @Override
>>>    public void processElement(SplitterToMultiStore packet, Context ctx, 
>>> Collector<ClassifierOutput> out) throws Exception {
>>>       if (packet.hasPackets()) {
>>>          storedPackets.inc(packet.getPackets().getPacketsCount());
>>>       }
>>>
>>>       MultiStorePacketState so = state.value();
>>>       if (process(packet, out, so, ctx)) {
>>>          state.update(null);
>>>          state.clear();
>>>       } else {
>>>          state.update(so);
>>>       }
>>>    }
>>>
>>>     public String generateNextFilename(String sessionKey, int partNumber) {
>>>       String path = DirectoryService.getInstance().bookDirectory();
>>>       return path + File.separator + sessionKey + "-" + partNumber + 
>>> ".pcap";
>>>    }
>>>
>>>    private void storeContent(Collector<ClassifierOutput> collector, 
>>> MultiStorePacketState state, SplitterToMultiStore packets) throws Exception 
>>> {
>>>       assert (packets != null);
>>>       assert (packets.hasPackets());
>>>
>>>       if ( state.currentFile == null) {
>>>          openFile(collector, state, packets);
>>>       }
>>>
>>>       Utils.continueWriteToPcap(state.currentFile, 
>>> packets.getPackets().getPacketsList());
>>>       state.fileOffset = state.currentFile.length();
>>>
>>>       tryToCloseFile(collector, state);
>>>    }
>>>
>>>    static public String extractExportedFileName(String fileName) {
>>>       String path[] = fileName.split("/");
>>>       return path[path.length - 2] + "/" + path[path.length - 1];
>>>    }
>>>
>>>    private void openFile(Collector<ClassifierOutput> collector, 
>>> MultiStorePacketState state, SplitterToMultiStore packets) throws Exception 
>>> {
>>>       state.fileIsOpened = true;
>>>       state.fileName = generateNextFilename(state.sessionKey, 
>>> state.partNumber);
>>>       state.exportedFileName = extractExportedFileName(state.fileName);
>>>
>>> // -> Here RandomAccessFile created
>>>       state.currentFile = Utils.startWriteToPcap(state.fileName, 
>>> packets.getPackets().getPacketsList());
>>>       state.fileOffset = state.currentFile.length();
>>>       state.partNumber++;
>>>    }
>>>
>>>    private void tryToCloseFile(Collector<ClassifierOutput> collector, 
>>> MultiStorePacketState state) throws IOException {
>>>       if (state.currentFile.length() < 
>>> StorePacketConfigurationParameters.partSizeLimit) {
>>>          return;
>>>       }
>>>       closeFile(collector, state);
>>>    }
>>>
>>>    private void closeFile(Collector<ClassifierOutput> collector, 
>>> MultiStorePacketState state) throws IOException {
>>>       state.currentFile.close();
>>>       state.currentFile = null;
>>>       state.fileIsOpened = false;
>>>       ClassifierOutput.Builder outputBuilder = 
>>> ClassifierOutput.newBuilder();
>>>       outputBuilder.getUsualBuilder().setFileName(state.exportedFileName);
>>>       outputBuilder.setSessionType(SessionType.Multi);
>>>       outputBuilder.setSessionKey(state.sessionKey);
>>>       var classifierOutput = outputBuilder.build();
>>>       state.sessionMetadata.add(classifierOutput);
>>>       collector.collect(classifierOutput);
>>>    }
>>>
>>>     public boolean process(SplitterToMultiStore packet, 
>>> Collector<ClassifierOutput> collector, MultiStorePacketState so, Context 
>>> context) throws Exception {
>>>
>>>       // First message
>>>       if (packet.hasClassificationResult()) {
>>>          sendClassificationResult(packet, collector, so);
>>>          return false;
>>>       }
>>>
>>>       // Last message
>>>       if (packet.hasSessionClosure()) {
>>>          if (so.isCoverageIncorrect) {
>>>             return true;
>>>          }
>>>          handleSessionClosure(packet, collector, so, context);
>>>          return true;
>>>       }
>>>
>>>       if (so.isCoverageIncorrect) {
>>>          return false;
>>>       }
>>>       storeContent(collector, so, packet);
>>>
>>>       // File could be already close e.g. it reached expected size.
>>>       if (so.currentFile != null) {
>>>          setupTimer(so, context.timerService());
>>>       }
>>>       return false;
>>>    }
>>>
>>>    private void handleSessionClosure(SplitterToMultiStore packet, 
>>> Collector<ClassifierOutput> collector, MultiStorePacketState so, Context 
>>> context) throws IOException {
>>>       if (so.currentFile != null) {
>>>          closeFile(collector, so);
>>>       }
>>>       ClassifierOutput.Builder outputBuilder = 
>>> ClassifierOutput.newBuilder();
>>>       outputBuilder.setSessionKey(packet.getSessionKey());
>>>       outputBuilder.setSessionType(packet.getSessionType());
>>>       var messageBuilder = outputBuilder.getLastBuilder();
>>>       messageBuilder.addAllAggregatedSession(so.sessionMetadata);
>>>       outputBuilder.setLast(messageBuilder.build());
>>>       var output = outputBuilder.build();
>>>       collector.collect(output);
>>>       context.timerService().deleteProcessingTimeTimer(so.timerValue);
>>>       state.clear();
>>>    }
>>>
>>>    private void sendClassificationResult(SplitterToMultiStore packet, 
>>> Collector<ClassifierOutput> collector, MultiStorePacketState so) {
>>>       var coverageResult = 
>>> CoverageChecker.obtainCoverageInfo(packet.getClassificationResult().getLIID());
>>>       so.isCoverageIncorrect = !coverageResult.getValue0();
>>>       if (so.isCoverageIncorrect) {
>>>          return;
>>>       }
>>>
>>>       ClassifierOutput.Builder outputBuilder = 
>>> ClassifierOutput.newBuilder();
>>>       
>>> outputBuilder.getFirstBuilder().setClassificationResult(packet.getClassificationResult().getClassificationResult());
>>>       
>>> outputBuilder.getFirstBuilder().setLIID(packet.getClassificationResult().getLIID());
>>>       
>>> outputBuilder.getFirstBuilder().setCIN(packet.getClassificationResult().getCIN());
>>>       
>>> outputBuilder.getFirstBuilder().setOperatorId(packet.getClassificationResult().getOperatorId());
>>>       
>>> outputBuilder.getFirstBuilder().setCoverage(coverageResult.getValue1());
>>>       outputBuilder.setSessionKey(packet.getSessionKey());
>>>       outputBuilder.setSessionType(packet.getSessionType());
>>>       so.sessionKey = packet.getSessionKey();
>>>       var classifierOutput = outputBuilder.build();
>>>       so.sessionMetadata.add(classifierOutput);
>>>       collector.collect(classifierOutput);
>>>    }
>>>
>>>     @Override
>>>    public void open(Configuration config) {
>>>       ValueStateDescriptor<MultiStorePacketState> descriptor =
>>>             new ValueStateDescriptor<MultiStorePacketState>(
>>>                   "MultiStorePacketState", // the state name
>>>                   TypeInformation.of(new TypeHint<MultiStorePacketState>() 
>>> {}), // type information
>>>                   new MultiStorePacketState()); // default value of the 
>>> state, if nothing was set
>>>       state = getRuntimeContext().getState(descriptor);
>>>       StorePacketConfigurationParameters.init();
>>>    }
>>>
>>>    @Override
>>>    public void onTimer(long timestamp, OnTimerContext ctx, 
>>> Collector<ClassifierOutput> collector) throws Exception {
>>>       MultiStorePacketState so = state.value();
>>>       if (so.currentFile != null) {
>>>          closeFile(collector, so);
>>>       }
>>>
>>>       ctx.timerService().deleteProcessingTimeTimer(so.timerValue);
>>>       state.update(so);
>>>    }
>>>
>>>    private void setupTimer(MultiStorePacketState so, TimerService 
>>> timerService) {
>>>       // Cancel previous timer
>>>       timerService.deleteProcessingTimeTimer(so.timerValue);
>>>       // Register new timer
>>>       so.timerValue = (timerService.currentProcessingTime() + 
>>> StorePacketConfigurationParameters.partAggregationTimeout)/1000*1000;
>>>       timerService.registerProcessingTimeTimer(so.timerValue);
>>>    }
>>> }
>>>
>>>
>>>
>>> The state class looks like following:
>>>
>>> public class MultiStorePacketState implements Serializable {
>>>     private /*static*/ Logger logger = 
>>> LoggerFactory.getLogger(MultiStorePacketState.class);
>>>     public transient RandomAccessFile currentFile = null;
>>>     public long timerValue;
>>>     public String fileName;
>>>     public String exportedFileName;
>>>     public String sessionKey;
>>>     public long fileOffset = 0;
>>>     public int partNumber = 0;
>>>     public boolean isCoverageIncorrect = false;
>>>     public boolean fileIsOpened = false;
>>>     public ArrayList<ClassifierOutput> sessionMetadata = new ArrayList<>();
>>>
>>>     private void readObject(ObjectInputStream ois)
>>>             throws ClassNotFoundException, IOException {
>>>         ois.defaultReadObject();
>>>         logger.info("Deserialized MultiStorePacketState: " + 
>>> this.toString());
>>>
>>>         // No need to do anything in case of empty file
>>>         if (fileName.isEmpty()) {
>>>             return;
>>>         }
>>>         currentFile = new RandomAccessFile(fileName,"rw");
>>>         currentFile.seek(fileOffset);
>>>     }
>>> }
>>>
>>> Input & output are Proto files , you could replace input with byte[] ,
>>> and remove output generation and calls to collector.
>>> The test should generate & store data for a while, so at least once
>>> checkpoint would be triggered. I used a checkpoint interval 5000ms on a
>>> quite slow system.
>>> Every data chunk is about 1k.
>>> Utils.startWriteToPcap - new RandomAccessFile()
>>> Utils.writeToPcap - should be replaced with currentFile.write()
>>>
>>> пн, 11 окт. 2021 г. в 16:50, JING ZHANG <beyond1...@gmail.com>:
>>>
>>> Hi Alex,
>>> It is a little weird.
>>> Would you please provide the program which could reproduce the problem,
>>> including DataStream job code and related classes code. I need some debug
>>> to find out the reason.
>>>
>>> Best,
>>> JING ZHANG
>>>
>>>
>>> Alex Drobinsky <alex.drobin...@gmail.com> 于2021年10月11日周一 下午5:50写道:
>>>
>>> Hi Jing Zhang,
>>>
>>> I'm using the FileSystem backend. I also implemented ReadObject function
>>> to support proper restart procedure:
>>>
>>> private void readObject(ObjectInputStream ois)
>>>         throws ClassNotFoundException, IOException {
>>>     ois.defaultReadObject();
>>>     logger.info("Deserialized MultiStorePacketState: " + this.toString());
>>>
>>>     // No need to do anything in case of empty file
>>>     if (fileName.isEmpty()) {
>>>         return;
>>>     }
>>>     currentFile = new RandomAccessFile(fileName,"rw");
>>>     currentFile.seek(fileOffset);
>>> }
>>>
>>> However, according to logs this function wasn't called.
>>>
>>> Btw, it could be beneficial to add this kind of State object e.g. FileState 
>>> which will encapsulate serialization / deserialization for RandomAccessFile 
>>> although the concept itself is a bit contradictory to regular state.
>>>
>>> Currently, I implemented and tested a workaround via addition of the 
>>> boolean variable isFileOpened, however it's awkward because I need to check 
>>> the state of the transient variable every time I use state.value().
>>>
>>> So should it be expected that transient variables in state would be 
>>> resetted to default values ?
>>>
>>>
>>> пн, 11 окт. 2021 г. в 12:33, JING ZHANG <beyond1...@gmail.com>:
>>>
>>> Hi, Alex
>>> What state backend do you choose?
>>> If you choose MemoryStateBackend or FsStateBackend, `transient` keyword
>>> may not have effect because MemoryStateBackend does not serialize state for
>>> regular read/write accesses but keeps it as objects on the heap.
>>> If you choose RocksDBStateBackend, I thought it was expected behavior
>>> because RocksDBStateBackend stores all state as byte arrays in embedded
>>> RocksDB instances. Therefore, it de/serializes the state of a key for every
>>> read/write access.  CurrentFile is null because the transient variable
>>> would not be serialized by default.
>>>
>>> Best,
>>> JING ZHANG
>>>
>>>
>>> Alex Drobinsky <alex.drobin...@gmail.com> 于2021年10月11日周一 下午4:33写道:
>>>
>>> Dear flink community,
>>>
>>> I have following state class ( irrelevant fields removed )
>>> public class MultiStorePacketState implements Serializable {
>>>
>>>     public transient RandomAccessFile currentFile = null;
>>>     public long timerValue;
>>>     public String fileName;
>>>     public String exportedFileName;
>>>     public String sessionKey;
>>>     public long fileOffset = 0;
>>>
>>> }
>>>
>>> Once in a while, currentFile became *nullified, *this happens after I
>>> extract state via
>>>
>>> MultiStorePacketState so = state.value();
>>>
>>> The frequency of this behaviour is similar to checkpoint interval ( 
>>> checkpoint interval defined as 5 seconds and first occurence of this 
>>> problem is also 5 seconds), otherwise I don't have any clues to a possible 
>>> explanation.
>>>
>>> Is it an expected side effect of checkpoint procedure ?
>>>
>>> Best regards,
>>> Alex
>>>
>>>

Reply via email to