Hi Jing,

Job doesn't restart from the checkpoint, it's a brand new clean job , no
exceptions happened during execution, no restarts :)
The state is a Keyed State so a new key means a new State - in this
situation a currentFile is equal to null - as expected and handled without
issues.
Before I even thought to inquire about my questions, the first thing I did
- I added log messages with the value of currentFile in any place it could
be changed.
So I checked that before I release my control to Flink, currentFile has the
correct value and after I receive value from state in the next iteration
it's set to null.
The checkpoints by themselves could be irrelevant to the problem, the only
indication of connection is my assumption based on observation that the
interval between first event and first occurrence of nullification is
exactly the same as the checkpoint interval.

Yun Tang - you are correct, it's a KryoSerializer, if I remove the
"transient" qualifier from currentFile, it crashes inside of KryoSerializer
because RandomAccessFile isn't serializable.
Which also points to the fact that at least once serialization was actually
executed. I will try an alternative approach - I will add my own
writeObject implementation, it should work :)

Best regards,
Alex






вт, 12 окт. 2021 г. в 15:07, JING ZHANG <beyond1...@gmail.com>:

> Hi Alex,
> Since you use `FileSystemStateBackend`, I think currentFile became
> nullified once in a while is not caused by period checkpoint.
>
> Because if job is running without failover or restore from checkpoint,
> read/write value state on `FileSystemStateBackend` does not cause
> serialization and deserialization at all. I have already simplify your
> coding and verify this point. If you are interested, you can find this
> simplified code in the attachment of the email.
>
> There are some possible reasons come to my mind, I hope this helps.
> 1. Does job restore from checkpoint/savepoint? This may caused by failover
> or user trigger stop-with-savepoint.
> 2. If job does not restore from checkpoint or savepoint.
>      2.1 After read the MultiStorePacketState from ValueState, is there
> somewhere in your program to update the currentFile field to null again?
> Because the state stored in heap,  it may be changed if program updates its
> value somewhere.
>      2.2 When the currentFile became null, is there any possible that
> current key never appear before? that is it's the first time that the
> current key appears, so get state would return default value(a new
> MultiStorePacketState instance with null currentFile)
>
> Best,
> JING ZHANG
>
> Yun Tang <myas...@live.com> 于2021年10月12日周二 下午4:41写道:
>
>> Hi Alex,
>>
>> Since you use customized MultiStorePacketState class as the value state
>> type, it should use kryo serializer [1] to serialize your class via
>> accessing RocksDB state or checkpoint via FileSystemStateBackend, and I
>> don't know whether Kryo would serialize your transient field.
>> If you're not familiar with Flink's serialization stack, I think you
>> could check behaviors below:
>>
>>    1. Without any checkpoint restore, use FileSystemStateBackend to see
>>    whether the transient field could be read as expected, the answer should 
>> be
>>    yes.
>>    2. After restoring from checkpoint, check whether could read the
>>    transient field back if using FileSystemStateBackend.
>>
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/serialization/types_serialization/#flinks-typeinformation-class
>>
>> Best
>> Yun Tang
>>
>>
>> ------------------------------
>> *From:* Alex Drobinsky <alex.drobin...@gmail.com>
>> *Sent:* Monday, October 11, 2021 22:37
>> *To:* JING ZHANG <beyond1...@gmail.com>
>> *Cc:* User-Flink <user@flink.apache.org>
>> *Subject:* Re: Reset of transient variables in state to default values.
>>
>> It would be difficult to provide even a semblance of the complete product
>> , however I could try to provide enough details to reproduce the problem.
>> Standard source would do:
>>
>> DataStream<byte[]> stream = env.addSource(
>>         new FlinkKafkaConsumer<>(topic, new 
>> AbstractDeserializationSchema<byte[]>() {
>>             @Override
>>             public byte[] deserialize(byte[] bytes) throws IOException {
>>                 return bytes;
>>             }
>>         }, properties)).name(topic);
>>
>>
>> The operator body something like:
>>
>> public class MultiStorePacketFunction extends KeyedProcessFunction<String, 
>> SplitterToMultiStore, ClassifierOutput> implements Serializable {
>>    private transient ValueState<MultiStorePacketState> state;
>>
>>    @Override
>>    public void processElement(SplitterToMultiStore packet, Context ctx, 
>> Collector<ClassifierOutput> out) throws Exception {
>>       if (packet.hasPackets()) {
>>          storedPackets.inc(packet.getPackets().getPacketsCount());
>>       }
>>
>>       MultiStorePacketState so = state.value();
>>       if (process(packet, out, so, ctx)) {
>>          state.update(null);
>>          state.clear();
>>       } else {
>>          state.update(so);
>>       }
>>    }
>>
>>     public String generateNextFilename(String sessionKey, int partNumber) {
>>       String path = DirectoryService.getInstance().bookDirectory();
>>       return path + File.separator + sessionKey + "-" + partNumber + ".pcap";
>>    }
>>
>>    private void storeContent(Collector<ClassifierOutput> collector, 
>> MultiStorePacketState state, SplitterToMultiStore packets) throws Exception {
>>       assert (packets != null);
>>       assert (packets.hasPackets());
>>
>>       if ( state.currentFile == null) {
>>          openFile(collector, state, packets);
>>       }
>>
>>       Utils.continueWriteToPcap(state.currentFile, 
>> packets.getPackets().getPacketsList());
>>       state.fileOffset = state.currentFile.length();
>>
>>       tryToCloseFile(collector, state);
>>    }
>>
>>    static public String extractExportedFileName(String fileName) {
>>       String path[] = fileName.split("/");
>>       return path[path.length - 2] + "/" + path[path.length - 1];
>>    }
>>
>>    private void openFile(Collector<ClassifierOutput> collector, 
>> MultiStorePacketState state, SplitterToMultiStore packets) throws Exception {
>>       state.fileIsOpened = true;
>>       state.fileName = generateNextFilename(state.sessionKey, 
>> state.partNumber);
>>       state.exportedFileName = extractExportedFileName(state.fileName);
>>
>> // -> Here RandomAccessFile created
>>       state.currentFile = Utils.startWriteToPcap(state.fileName, 
>> packets.getPackets().getPacketsList());
>>       state.fileOffset = state.currentFile.length();
>>       state.partNumber++;
>>    }
>>
>>    private void tryToCloseFile(Collector<ClassifierOutput> collector, 
>> MultiStorePacketState state) throws IOException {
>>       if (state.currentFile.length() < 
>> StorePacketConfigurationParameters.partSizeLimit) {
>>          return;
>>       }
>>       closeFile(collector, state);
>>    }
>>
>>    private void closeFile(Collector<ClassifierOutput> collector, 
>> MultiStorePacketState state) throws IOException {
>>       state.currentFile.close();
>>       state.currentFile = null;
>>       state.fileIsOpened = false;
>>       ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder();
>>       outputBuilder.getUsualBuilder().setFileName(state.exportedFileName);
>>       outputBuilder.setSessionType(SessionType.Multi);
>>       outputBuilder.setSessionKey(state.sessionKey);
>>       var classifierOutput = outputBuilder.build();
>>       state.sessionMetadata.add(classifierOutput);
>>       collector.collect(classifierOutput);
>>    }
>>
>>     public boolean process(SplitterToMultiStore packet, 
>> Collector<ClassifierOutput> collector, MultiStorePacketState so, Context 
>> context) throws Exception {
>>
>>       // First message
>>       if (packet.hasClassificationResult()) {
>>          sendClassificationResult(packet, collector, so);
>>          return false;
>>       }
>>
>>       // Last message
>>       if (packet.hasSessionClosure()) {
>>          if (so.isCoverageIncorrect) {
>>             return true;
>>          }
>>          handleSessionClosure(packet, collector, so, context);
>>          return true;
>>       }
>>
>>       if (so.isCoverageIncorrect) {
>>          return false;
>>       }
>>       storeContent(collector, so, packet);
>>
>>       // File could be already close e.g. it reached expected size.
>>       if (so.currentFile != null) {
>>          setupTimer(so, context.timerService());
>>       }
>>       return false;
>>    }
>>
>>    private void handleSessionClosure(SplitterToMultiStore packet, 
>> Collector<ClassifierOutput> collector, MultiStorePacketState so, Context 
>> context) throws IOException {
>>       if (so.currentFile != null) {
>>          closeFile(collector, so);
>>       }
>>       ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder();
>>       outputBuilder.setSessionKey(packet.getSessionKey());
>>       outputBuilder.setSessionType(packet.getSessionType());
>>       var messageBuilder = outputBuilder.getLastBuilder();
>>       messageBuilder.addAllAggregatedSession(so.sessionMetadata);
>>       outputBuilder.setLast(messageBuilder.build());
>>       var output = outputBuilder.build();
>>       collector.collect(output);
>>       context.timerService().deleteProcessingTimeTimer(so.timerValue);
>>       state.clear();
>>    }
>>
>>    private void sendClassificationResult(SplitterToMultiStore packet, 
>> Collector<ClassifierOutput> collector, MultiStorePacketState so) {
>>       var coverageResult = 
>> CoverageChecker.obtainCoverageInfo(packet.getClassificationResult().getLIID());
>>       so.isCoverageIncorrect = !coverageResult.getValue0();
>>       if (so.isCoverageIncorrect) {
>>          return;
>>       }
>>
>>       ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder();
>>       
>> outputBuilder.getFirstBuilder().setClassificationResult(packet.getClassificationResult().getClassificationResult());
>>       
>> outputBuilder.getFirstBuilder().setLIID(packet.getClassificationResult().getLIID());
>>       
>> outputBuilder.getFirstBuilder().setCIN(packet.getClassificationResult().getCIN());
>>       
>> outputBuilder.getFirstBuilder().setOperatorId(packet.getClassificationResult().getOperatorId());
>>       
>> outputBuilder.getFirstBuilder().setCoverage(coverageResult.getValue1());
>>       outputBuilder.setSessionKey(packet.getSessionKey());
>>       outputBuilder.setSessionType(packet.getSessionType());
>>       so.sessionKey = packet.getSessionKey();
>>       var classifierOutput = outputBuilder.build();
>>       so.sessionMetadata.add(classifierOutput);
>>       collector.collect(classifierOutput);
>>    }
>>
>>     @Override
>>    public void open(Configuration config) {
>>       ValueStateDescriptor<MultiStorePacketState> descriptor =
>>             new ValueStateDescriptor<MultiStorePacketState>(
>>                   "MultiStorePacketState", // the state name
>>                   TypeInformation.of(new TypeHint<MultiStorePacketState>() 
>> {}), // type information
>>                   new MultiStorePacketState()); // default value of the 
>> state, if nothing was set
>>       state = getRuntimeContext().getState(descriptor);
>>       StorePacketConfigurationParameters.init();
>>    }
>>
>>    @Override
>>    public void onTimer(long timestamp, OnTimerContext ctx, 
>> Collector<ClassifierOutput> collector) throws Exception {
>>       MultiStorePacketState so = state.value();
>>       if (so.currentFile != null) {
>>          closeFile(collector, so);
>>       }
>>
>>       ctx.timerService().deleteProcessingTimeTimer(so.timerValue);
>>       state.update(so);
>>    }
>>
>>    private void setupTimer(MultiStorePacketState so, TimerService 
>> timerService) {
>>       // Cancel previous timer
>>       timerService.deleteProcessingTimeTimer(so.timerValue);
>>       // Register new timer
>>       so.timerValue = (timerService.currentProcessingTime() + 
>> StorePacketConfigurationParameters.partAggregationTimeout)/1000*1000;
>>       timerService.registerProcessingTimeTimer(so.timerValue);
>>    }
>> }
>>
>>
>>
>> The state class looks like following:
>>
>> public class MultiStorePacketState implements Serializable {
>>     private /*static*/ Logger logger = 
>> LoggerFactory.getLogger(MultiStorePacketState.class);
>>     public transient RandomAccessFile currentFile = null;
>>     public long timerValue;
>>     public String fileName;
>>     public String exportedFileName;
>>     public String sessionKey;
>>     public long fileOffset = 0;
>>     public int partNumber = 0;
>>     public boolean isCoverageIncorrect = false;
>>     public boolean fileIsOpened = false;
>>     public ArrayList<ClassifierOutput> sessionMetadata = new ArrayList<>();
>>
>>     private void readObject(ObjectInputStream ois)
>>             throws ClassNotFoundException, IOException {
>>         ois.defaultReadObject();
>>         logger.info("Deserialized MultiStorePacketState: " + 
>> this.toString());
>>
>>         // No need to do anything in case of empty file
>>         if (fileName.isEmpty()) {
>>             return;
>>         }
>>         currentFile = new RandomAccessFile(fileName,"rw");
>>         currentFile.seek(fileOffset);
>>     }
>> }
>>
>> Input & output are Proto files , you could replace input with byte[] ,
>> and remove output generation and calls to collector.
>> The test should generate & store data for a while, so at least once
>> checkpoint would be triggered. I used a checkpoint interval 5000ms on a
>> quite slow system.
>> Every data chunk is about 1k.
>> Utils.startWriteToPcap - new RandomAccessFile()
>> Utils.writeToPcap - should be replaced with currentFile.write()
>>
>> пн, 11 окт. 2021 г. в 16:50, JING ZHANG <beyond1...@gmail.com>:
>>
>> Hi Alex,
>> It is a little weird.
>> Would you please provide the program which could reproduce the problem,
>> including DataStream job code and related classes code. I need some debug
>> to find out the reason.
>>
>> Best,
>> JING ZHANG
>>
>>
>> Alex Drobinsky <alex.drobin...@gmail.com> 于2021年10月11日周一 下午5:50写道:
>>
>> Hi Jing Zhang,
>>
>> I'm using the FileSystem backend. I also implemented ReadObject function
>> to support proper restart procedure:
>>
>> private void readObject(ObjectInputStream ois)
>>         throws ClassNotFoundException, IOException {
>>     ois.defaultReadObject();
>>     logger.info("Deserialized MultiStorePacketState: " + this.toString());
>>
>>     // No need to do anything in case of empty file
>>     if (fileName.isEmpty()) {
>>         return;
>>     }
>>     currentFile = new RandomAccessFile(fileName,"rw");
>>     currentFile.seek(fileOffset);
>> }
>>
>> However, according to logs this function wasn't called.
>>
>> Btw, it could be beneficial to add this kind of State object e.g. FileState 
>> which will encapsulate serialization / deserialization for RandomAccessFile 
>> although the concept itself is a bit contradictory to regular state.
>>
>> Currently, I implemented and tested a workaround via addition of the boolean 
>> variable isFileOpened, however it's awkward because I need to check the 
>> state of the transient variable every time I use state.value().
>>
>> So should it be expected that transient variables in state would be resetted 
>> to default values ?
>>
>>
>> пн, 11 окт. 2021 г. в 12:33, JING ZHANG <beyond1...@gmail.com>:
>>
>> Hi, Alex
>> What state backend do you choose?
>> If you choose MemoryStateBackend or FsStateBackend, `transient` keyword
>> may not have effect because MemoryStateBackend does not serialize state for
>> regular read/write accesses but keeps it as objects on the heap.
>> If you choose RocksDBStateBackend, I thought it was expected behavior
>> because RocksDBStateBackend stores all state as byte arrays in embedded
>> RocksDB instances. Therefore, it de/serializes the state of a key for every
>> read/write access.  CurrentFile is null because the transient variable
>> would not be serialized by default.
>>
>> Best,
>> JING ZHANG
>>
>>
>> Alex Drobinsky <alex.drobin...@gmail.com> 于2021年10月11日周一 下午4:33写道:
>>
>> Dear flink community,
>>
>> I have following state class ( irrelevant fields removed )
>> public class MultiStorePacketState implements Serializable {
>>
>>     public transient RandomAccessFile currentFile = null;
>>     public long timerValue;
>>     public String fileName;
>>     public String exportedFileName;
>>     public String sessionKey;
>>     public long fileOffset = 0;
>>
>> }
>>
>> Once in a while, currentFile became *nullified, *this happens after I
>> extract state via
>>
>> MultiStorePacketState so = state.value();
>>
>> The frequency of this behaviour is similar to checkpoint interval ( 
>> checkpoint interval defined as 5 seconds and first occurence of this problem 
>> is also 5 seconds), otherwise I don't have any clues to a possible 
>> explanation.
>>
>> Is it an expected side effect of checkpoint procedure ?
>>
>> Best regards,
>> Alex
>>
>>

Reply via email to