Hi Alex,
Since you use `FileSystemStateBackend`, I think currentFile became
nullified once in a while is not caused by period checkpoint.

Because if job is running without failover or restore from checkpoint,
read/write value state on `FileSystemStateBackend` does not cause
serialization and deserialization at all. I have already simplify your
coding and verify this point. If you are interested, you can find this
simplified code in the attachment of the email.

There are some possible reasons come to my mind, I hope this helps.
1. Does job restore from checkpoint/savepoint? This may caused by failover
or user trigger stop-with-savepoint.
2. If job does not restore from checkpoint or savepoint.
     2.1 After read the MultiStorePacketState from ValueState, is there
somewhere in your program to update the currentFile field to null again?
Because the state stored in heap,  it may be changed if program updates its
value somewhere.
     2.2 When the currentFile became null, is there any possible that
current key never appear before? that is it's the first time that the
current key appears, so get state would return default value(a new
MultiStorePacketState instance with null currentFile)

Best,
JING ZHANG

Yun Tang <myas...@live.com> 于2021年10月12日周二 下午4:41写道:

> Hi Alex,
>
> Since you use customized MultiStorePacketState class as the value state
> type, it should use kryo serializer [1] to serialize your class via
> accessing RocksDB state or checkpoint via FileSystemStateBackend, and I
> don't know whether Kryo would serialize your transient field.
> If you're not familiar with Flink's serialization stack, I think you could
> check behaviors below:
>
>    1. Without any checkpoint restore, use FileSystemStateBackend to see
>    whether the transient field could be read as expected, the answer should be
>    yes.
>    2. After restoring from checkpoint, check whether could read the
>    transient field back if using FileSystemStateBackend.
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/serialization/types_serialization/#flinks-typeinformation-class
>
> Best
> Yun Tang
>
>
> ------------------------------
> *From:* Alex Drobinsky <alex.drobin...@gmail.com>
> *Sent:* Monday, October 11, 2021 22:37
> *To:* JING ZHANG <beyond1...@gmail.com>
> *Cc:* User-Flink <user@flink.apache.org>
> *Subject:* Re: Reset of transient variables in state to default values.
>
> It would be difficult to provide even a semblance of the complete product
> , however I could try to provide enough details to reproduce the problem.
> Standard source would do:
>
> DataStream<byte[]> stream = env.addSource(
>         new FlinkKafkaConsumer<>(topic, new 
> AbstractDeserializationSchema<byte[]>() {
>             @Override
>             public byte[] deserialize(byte[] bytes) throws IOException {
>                 return bytes;
>             }
>         }, properties)).name(topic);
>
>
> The operator body something like:
>
> public class MultiStorePacketFunction extends KeyedProcessFunction<String, 
> SplitterToMultiStore, ClassifierOutput> implements Serializable {
>    private transient ValueState<MultiStorePacketState> state;
>
>    @Override
>    public void processElement(SplitterToMultiStore packet, Context ctx, 
> Collector<ClassifierOutput> out) throws Exception {
>       if (packet.hasPackets()) {
>          storedPackets.inc(packet.getPackets().getPacketsCount());
>       }
>
>       MultiStorePacketState so = state.value();
>       if (process(packet, out, so, ctx)) {
>          state.update(null);
>          state.clear();
>       } else {
>          state.update(so);
>       }
>    }
>
>     public String generateNextFilename(String sessionKey, int partNumber) {
>       String path = DirectoryService.getInstance().bookDirectory();
>       return path + File.separator + sessionKey + "-" + partNumber + ".pcap";
>    }
>
>    private void storeContent(Collector<ClassifierOutput> collector, 
> MultiStorePacketState state, SplitterToMultiStore packets) throws Exception {
>       assert (packets != null);
>       assert (packets.hasPackets());
>
>       if ( state.currentFile == null) {
>          openFile(collector, state, packets);
>       }
>
>       Utils.continueWriteToPcap(state.currentFile, 
> packets.getPackets().getPacketsList());
>       state.fileOffset = state.currentFile.length();
>
>       tryToCloseFile(collector, state);
>    }
>
>    static public String extractExportedFileName(String fileName) {
>       String path[] = fileName.split("/");
>       return path[path.length - 2] + "/" + path[path.length - 1];
>    }
>
>    private void openFile(Collector<ClassifierOutput> collector, 
> MultiStorePacketState state, SplitterToMultiStore packets) throws Exception {
>       state.fileIsOpened = true;
>       state.fileName = generateNextFilename(state.sessionKey, 
> state.partNumber);
>       state.exportedFileName = extractExportedFileName(state.fileName);
>
> // -> Here RandomAccessFile created
>       state.currentFile = Utils.startWriteToPcap(state.fileName, 
> packets.getPackets().getPacketsList());
>       state.fileOffset = state.currentFile.length();
>       state.partNumber++;
>    }
>
>    private void tryToCloseFile(Collector<ClassifierOutput> collector, 
> MultiStorePacketState state) throws IOException {
>       if (state.currentFile.length() < 
> StorePacketConfigurationParameters.partSizeLimit) {
>          return;
>       }
>       closeFile(collector, state);
>    }
>
>    private void closeFile(Collector<ClassifierOutput> collector, 
> MultiStorePacketState state) throws IOException {
>       state.currentFile.close();
>       state.currentFile = null;
>       state.fileIsOpened = false;
>       ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder();
>       outputBuilder.getUsualBuilder().setFileName(state.exportedFileName);
>       outputBuilder.setSessionType(SessionType.Multi);
>       outputBuilder.setSessionKey(state.sessionKey);
>       var classifierOutput = outputBuilder.build();
>       state.sessionMetadata.add(classifierOutput);
>       collector.collect(classifierOutput);
>    }
>
>     public boolean process(SplitterToMultiStore packet, 
> Collector<ClassifierOutput> collector, MultiStorePacketState so, Context 
> context) throws Exception {
>
>       // First message
>       if (packet.hasClassificationResult()) {
>          sendClassificationResult(packet, collector, so);
>          return false;
>       }
>
>       // Last message
>       if (packet.hasSessionClosure()) {
>          if (so.isCoverageIncorrect) {
>             return true;
>          }
>          handleSessionClosure(packet, collector, so, context);
>          return true;
>       }
>
>       if (so.isCoverageIncorrect) {
>          return false;
>       }
>       storeContent(collector, so, packet);
>
>       // File could be already close e.g. it reached expected size.
>       if (so.currentFile != null) {
>          setupTimer(so, context.timerService());
>       }
>       return false;
>    }
>
>    private void handleSessionClosure(SplitterToMultiStore packet, 
> Collector<ClassifierOutput> collector, MultiStorePacketState so, Context 
> context) throws IOException {
>       if (so.currentFile != null) {
>          closeFile(collector, so);
>       }
>       ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder();
>       outputBuilder.setSessionKey(packet.getSessionKey());
>       outputBuilder.setSessionType(packet.getSessionType());
>       var messageBuilder = outputBuilder.getLastBuilder();
>       messageBuilder.addAllAggregatedSession(so.sessionMetadata);
>       outputBuilder.setLast(messageBuilder.build());
>       var output = outputBuilder.build();
>       collector.collect(output);
>       context.timerService().deleteProcessingTimeTimer(so.timerValue);
>       state.clear();
>    }
>
>    private void sendClassificationResult(SplitterToMultiStore packet, 
> Collector<ClassifierOutput> collector, MultiStorePacketState so) {
>       var coverageResult = 
> CoverageChecker.obtainCoverageInfo(packet.getClassificationResult().getLIID());
>       so.isCoverageIncorrect = !coverageResult.getValue0();
>       if (so.isCoverageIncorrect) {
>          return;
>       }
>
>       ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder();
>       
> outputBuilder.getFirstBuilder().setClassificationResult(packet.getClassificationResult().getClassificationResult());
>       
> outputBuilder.getFirstBuilder().setLIID(packet.getClassificationResult().getLIID());
>       
> outputBuilder.getFirstBuilder().setCIN(packet.getClassificationResult().getCIN());
>       
> outputBuilder.getFirstBuilder().setOperatorId(packet.getClassificationResult().getOperatorId());
>       outputBuilder.getFirstBuilder().setCoverage(coverageResult.getValue1());
>       outputBuilder.setSessionKey(packet.getSessionKey());
>       outputBuilder.setSessionType(packet.getSessionType());
>       so.sessionKey = packet.getSessionKey();
>       var classifierOutput = outputBuilder.build();
>       so.sessionMetadata.add(classifierOutput);
>       collector.collect(classifierOutput);
>    }
>
>     @Override
>    public void open(Configuration config) {
>       ValueStateDescriptor<MultiStorePacketState> descriptor =
>             new ValueStateDescriptor<MultiStorePacketState>(
>                   "MultiStorePacketState", // the state name
>                   TypeInformation.of(new TypeHint<MultiStorePacketState>() 
> {}), // type information
>                   new MultiStorePacketState()); // default value of the 
> state, if nothing was set
>       state = getRuntimeContext().getState(descriptor);
>       StorePacketConfigurationParameters.init();
>    }
>
>    @Override
>    public void onTimer(long timestamp, OnTimerContext ctx, 
> Collector<ClassifierOutput> collector) throws Exception {
>       MultiStorePacketState so = state.value();
>       if (so.currentFile != null) {
>          closeFile(collector, so);
>       }
>
>       ctx.timerService().deleteProcessingTimeTimer(so.timerValue);
>       state.update(so);
>    }
>
>    private void setupTimer(MultiStorePacketState so, TimerService 
> timerService) {
>       // Cancel previous timer
>       timerService.deleteProcessingTimeTimer(so.timerValue);
>       // Register new timer
>       so.timerValue = (timerService.currentProcessingTime() + 
> StorePacketConfigurationParameters.partAggregationTimeout)/1000*1000;
>       timerService.registerProcessingTimeTimer(so.timerValue);
>    }
> }
>
>
>
> The state class looks like following:
>
> public class MultiStorePacketState implements Serializable {
>     private /*static*/ Logger logger = 
> LoggerFactory.getLogger(MultiStorePacketState.class);
>     public transient RandomAccessFile currentFile = null;
>     public long timerValue;
>     public String fileName;
>     public String exportedFileName;
>     public String sessionKey;
>     public long fileOffset = 0;
>     public int partNumber = 0;
>     public boolean isCoverageIncorrect = false;
>     public boolean fileIsOpened = false;
>     public ArrayList<ClassifierOutput> sessionMetadata = new ArrayList<>();
>
>     private void readObject(ObjectInputStream ois)
>             throws ClassNotFoundException, IOException {
>         ois.defaultReadObject();
>         logger.info("Deserialized MultiStorePacketState: " + this.toString());
>
>         // No need to do anything in case of empty file
>         if (fileName.isEmpty()) {
>             return;
>         }
>         currentFile = new RandomAccessFile(fileName,"rw");
>         currentFile.seek(fileOffset);
>     }
> }
>
> Input & output are Proto files , you could replace input with byte[] , and
> remove output generation and calls to collector.
> The test should generate & store data for a while, so at least once
> checkpoint would be triggered. I used a checkpoint interval 5000ms on a
> quite slow system.
> Every data chunk is about 1k.
> Utils.startWriteToPcap - new RandomAccessFile()
> Utils.writeToPcap - should be replaced with currentFile.write()
>
> пн, 11 окт. 2021 г. в 16:50, JING ZHANG <beyond1...@gmail.com>:
>
> Hi Alex,
> It is a little weird.
> Would you please provide the program which could reproduce the problem,
> including DataStream job code and related classes code. I need some debug
> to find out the reason.
>
> Best,
> JING ZHANG
>
>
> Alex Drobinsky <alex.drobin...@gmail.com> 于2021年10月11日周一 下午5:50写道:
>
> Hi Jing Zhang,
>
> I'm using the FileSystem backend. I also implemented ReadObject function
> to support proper restart procedure:
>
> private void readObject(ObjectInputStream ois)
>         throws ClassNotFoundException, IOException {
>     ois.defaultReadObject();
>     logger.info("Deserialized MultiStorePacketState: " + this.toString());
>
>     // No need to do anything in case of empty file
>     if (fileName.isEmpty()) {
>         return;
>     }
>     currentFile = new RandomAccessFile(fileName,"rw");
>     currentFile.seek(fileOffset);
> }
>
> However, according to logs this function wasn't called.
>
> Btw, it could be beneficial to add this kind of State object e.g. FileState 
> which will encapsulate serialization / deserialization for RandomAccessFile 
> although the concept itself is a bit contradictory to regular state.
>
> Currently, I implemented and tested a workaround via addition of the boolean 
> variable isFileOpened, however it's awkward because I need to check the state 
> of the transient variable every time I use state.value().
>
> So should it be expected that transient variables in state would be resetted 
> to default values ?
>
>
> пн, 11 окт. 2021 г. в 12:33, JING ZHANG <beyond1...@gmail.com>:
>
> Hi, Alex
> What state backend do you choose?
> If you choose MemoryStateBackend or FsStateBackend, `transient` keyword
> may not have effect because MemoryStateBackend does not serialize state for
> regular read/write accesses but keeps it as objects on the heap.
> If you choose RocksDBStateBackend, I thought it was expected behavior
> because RocksDBStateBackend stores all state as byte arrays in embedded
> RocksDB instances. Therefore, it de/serializes the state of a key for every
> read/write access.  CurrentFile is null because the transient variable
> would not be serialized by default.
>
> Best,
> JING ZHANG
>
>
> Alex Drobinsky <alex.drobin...@gmail.com> 于2021年10月11日周一 下午4:33写道:
>
> Dear flink community,
>
> I have following state class ( irrelevant fields removed )
> public class MultiStorePacketState implements Serializable {
>
>     public transient RandomAccessFile currentFile = null;
>     public long timerValue;
>     public String fileName;
>     public String exportedFileName;
>     public String sessionKey;
>     public long fileOffset = 0;
>
> }
>
> Once in a while, currentFile became *nullified, *this happens after I
> extract state via
>
> MultiStorePacketState so = state.value();
>
> The frequency of this behaviour is similar to checkpoint interval ( 
> checkpoint interval defined as 5 seconds and first occurence of this problem 
> is also 5 seconds), otherwise I don't have any clues to a possible 
> explanation.
>
> Is it an expected side effect of checkpoint procedure ?
>
> Best regards,
> Alex
>
>

Attachment: Test.java
Description: Binary data

Reply via email to