Hi Alex, could you also log the identifity hashcode (or something similar) of the related instance? I suspect that it's not the field that is set to null but that you get a clone where the field is null. In that case, you need to add a specific KryoSerializer to initialize it (or just go with a lazy access pattern all the way).
On Tue, Oct 12, 2021 at 2:55 PM Alex Drobinsky <alex.drobin...@gmail.com> wrote: > Hi Jing, > > Job doesn't restart from the checkpoint, it's a brand new clean job , no > exceptions happened during execution, no restarts :) > The state is a Keyed State so a new key means a new State - in this > situation a currentFile is equal to null - as expected and handled without > issues. > Before I even thought to inquire about my questions, the first thing I did > - I added log messages with the value of currentFile in any place it could > be changed. > So I checked that before I release my control to Flink, currentFile has > the correct value and after I receive value from state in the next > iteration it's set to null. > The checkpoints by themselves could be irrelevant to the problem, the only > indication of connection is my assumption based on observation that the > interval between first event and first occurrence of nullification is > exactly the same as the checkpoint interval. > > Yun Tang - you are correct, it's a KryoSerializer, if I remove the > "transient" qualifier from currentFile, it crashes inside of KryoSerializer > because RandomAccessFile isn't serializable. > Which also points to the fact that at least once serialization was > actually executed. I will try an alternative approach - I will add my own > writeObject implementation, it should work :) > > Best regards, > Alex > > > > > > > вт, 12 окт. 2021 г. в 15:07, JING ZHANG <beyond1...@gmail.com>: > >> Hi Alex, >> Since you use `FileSystemStateBackend`, I think currentFile became >> nullified once in a while is not caused by period checkpoint. >> >> Because if job is running without failover or restore from checkpoint, >> read/write value state on `FileSystemStateBackend` does not cause >> serialization and deserialization at all. I have already simplify your >> coding and verify this point. If you are interested, you can find this >> simplified code in the attachment of the email. >> >> There are some possible reasons come to my mind, I hope this helps. >> 1. Does job restore from checkpoint/savepoint? This may caused by >> failover or user trigger stop-with-savepoint. >> 2. If job does not restore from checkpoint or savepoint. >> 2.1 After read the MultiStorePacketState from ValueState, is there >> somewhere in your program to update the currentFile field to null again? >> Because the state stored in heap, it may be changed if program updates its >> value somewhere. >> 2.2 When the currentFile became null, is there any possible that >> current key never appear before? that is it's the first time that the >> current key appears, so get state would return default value(a new >> MultiStorePacketState instance with null currentFile) >> >> Best, >> JING ZHANG >> >> Yun Tang <myas...@live.com> 于2021年10月12日周二 下午4:41写道: >> >>> Hi Alex, >>> >>> Since you use customized MultiStorePacketState class as the value state >>> type, it should use kryo serializer [1] to serialize your class via >>> accessing RocksDB state or checkpoint via FileSystemStateBackend, and I >>> don't know whether Kryo would serialize your transient field. >>> If you're not familiar with Flink's serialization stack, I think you >>> could check behaviors below: >>> >>> 1. Without any checkpoint restore, use FileSystemStateBackend to see >>> whether the transient field could be read as expected, the answer should >>> be >>> yes. >>> 2. After restoring from checkpoint, check whether could read the >>> transient field back if using FileSystemStateBackend. >>> >>> >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/serialization/types_serialization/#flinks-typeinformation-class >>> >>> Best >>> Yun Tang >>> >>> >>> ------------------------------ >>> *From:* Alex Drobinsky <alex.drobin...@gmail.com> >>> *Sent:* Monday, October 11, 2021 22:37 >>> *To:* JING ZHANG <beyond1...@gmail.com> >>> *Cc:* User-Flink <user@flink.apache.org> >>> *Subject:* Re: Reset of transient variables in state to default values. >>> >>> It would be difficult to provide even a semblance of the complete >>> product , however I could try to provide enough details to reproduce the >>> problem. >>> Standard source would do: >>> >>> DataStream<byte[]> stream = env.addSource( >>> new FlinkKafkaConsumer<>(topic, new >>> AbstractDeserializationSchema<byte[]>() { >>> @Override >>> public byte[] deserialize(byte[] bytes) throws IOException { >>> return bytes; >>> } >>> }, properties)).name(topic); >>> >>> >>> The operator body something like: >>> >>> public class MultiStorePacketFunction extends KeyedProcessFunction<String, >>> SplitterToMultiStore, ClassifierOutput> implements Serializable { >>> private transient ValueState<MultiStorePacketState> state; >>> >>> @Override >>> public void processElement(SplitterToMultiStore packet, Context ctx, >>> Collector<ClassifierOutput> out) throws Exception { >>> if (packet.hasPackets()) { >>> storedPackets.inc(packet.getPackets().getPacketsCount()); >>> } >>> >>> MultiStorePacketState so = state.value(); >>> if (process(packet, out, so, ctx)) { >>> state.update(null); >>> state.clear(); >>> } else { >>> state.update(so); >>> } >>> } >>> >>> public String generateNextFilename(String sessionKey, int partNumber) { >>> String path = DirectoryService.getInstance().bookDirectory(); >>> return path + File.separator + sessionKey + "-" + partNumber + >>> ".pcap"; >>> } >>> >>> private void storeContent(Collector<ClassifierOutput> collector, >>> MultiStorePacketState state, SplitterToMultiStore packets) throws Exception >>> { >>> assert (packets != null); >>> assert (packets.hasPackets()); >>> >>> if ( state.currentFile == null) { >>> openFile(collector, state, packets); >>> } >>> >>> Utils.continueWriteToPcap(state.currentFile, >>> packets.getPackets().getPacketsList()); >>> state.fileOffset = state.currentFile.length(); >>> >>> tryToCloseFile(collector, state); >>> } >>> >>> static public String extractExportedFileName(String fileName) { >>> String path[] = fileName.split("/"); >>> return path[path.length - 2] + "/" + path[path.length - 1]; >>> } >>> >>> private void openFile(Collector<ClassifierOutput> collector, >>> MultiStorePacketState state, SplitterToMultiStore packets) throws Exception >>> { >>> state.fileIsOpened = true; >>> state.fileName = generateNextFilename(state.sessionKey, >>> state.partNumber); >>> state.exportedFileName = extractExportedFileName(state.fileName); >>> >>> // -> Here RandomAccessFile created >>> state.currentFile = Utils.startWriteToPcap(state.fileName, >>> packets.getPackets().getPacketsList()); >>> state.fileOffset = state.currentFile.length(); >>> state.partNumber++; >>> } >>> >>> private void tryToCloseFile(Collector<ClassifierOutput> collector, >>> MultiStorePacketState state) throws IOException { >>> if (state.currentFile.length() < >>> StorePacketConfigurationParameters.partSizeLimit) { >>> return; >>> } >>> closeFile(collector, state); >>> } >>> >>> private void closeFile(Collector<ClassifierOutput> collector, >>> MultiStorePacketState state) throws IOException { >>> state.currentFile.close(); >>> state.currentFile = null; >>> state.fileIsOpened = false; >>> ClassifierOutput.Builder outputBuilder = >>> ClassifierOutput.newBuilder(); >>> outputBuilder.getUsualBuilder().setFileName(state.exportedFileName); >>> outputBuilder.setSessionType(SessionType.Multi); >>> outputBuilder.setSessionKey(state.sessionKey); >>> var classifierOutput = outputBuilder.build(); >>> state.sessionMetadata.add(classifierOutput); >>> collector.collect(classifierOutput); >>> } >>> >>> public boolean process(SplitterToMultiStore packet, >>> Collector<ClassifierOutput> collector, MultiStorePacketState so, Context >>> context) throws Exception { >>> >>> // First message >>> if (packet.hasClassificationResult()) { >>> sendClassificationResult(packet, collector, so); >>> return false; >>> } >>> >>> // Last message >>> if (packet.hasSessionClosure()) { >>> if (so.isCoverageIncorrect) { >>> return true; >>> } >>> handleSessionClosure(packet, collector, so, context); >>> return true; >>> } >>> >>> if (so.isCoverageIncorrect) { >>> return false; >>> } >>> storeContent(collector, so, packet); >>> >>> // File could be already close e.g. it reached expected size. >>> if (so.currentFile != null) { >>> setupTimer(so, context.timerService()); >>> } >>> return false; >>> } >>> >>> private void handleSessionClosure(SplitterToMultiStore packet, >>> Collector<ClassifierOutput> collector, MultiStorePacketState so, Context >>> context) throws IOException { >>> if (so.currentFile != null) { >>> closeFile(collector, so); >>> } >>> ClassifierOutput.Builder outputBuilder = >>> ClassifierOutput.newBuilder(); >>> outputBuilder.setSessionKey(packet.getSessionKey()); >>> outputBuilder.setSessionType(packet.getSessionType()); >>> var messageBuilder = outputBuilder.getLastBuilder(); >>> messageBuilder.addAllAggregatedSession(so.sessionMetadata); >>> outputBuilder.setLast(messageBuilder.build()); >>> var output = outputBuilder.build(); >>> collector.collect(output); >>> context.timerService().deleteProcessingTimeTimer(so.timerValue); >>> state.clear(); >>> } >>> >>> private void sendClassificationResult(SplitterToMultiStore packet, >>> Collector<ClassifierOutput> collector, MultiStorePacketState so) { >>> var coverageResult = >>> CoverageChecker.obtainCoverageInfo(packet.getClassificationResult().getLIID()); >>> so.isCoverageIncorrect = !coverageResult.getValue0(); >>> if (so.isCoverageIncorrect) { >>> return; >>> } >>> >>> ClassifierOutput.Builder outputBuilder = >>> ClassifierOutput.newBuilder(); >>> >>> outputBuilder.getFirstBuilder().setClassificationResult(packet.getClassificationResult().getClassificationResult()); >>> >>> outputBuilder.getFirstBuilder().setLIID(packet.getClassificationResult().getLIID()); >>> >>> outputBuilder.getFirstBuilder().setCIN(packet.getClassificationResult().getCIN()); >>> >>> outputBuilder.getFirstBuilder().setOperatorId(packet.getClassificationResult().getOperatorId()); >>> >>> outputBuilder.getFirstBuilder().setCoverage(coverageResult.getValue1()); >>> outputBuilder.setSessionKey(packet.getSessionKey()); >>> outputBuilder.setSessionType(packet.getSessionType()); >>> so.sessionKey = packet.getSessionKey(); >>> var classifierOutput = outputBuilder.build(); >>> so.sessionMetadata.add(classifierOutput); >>> collector.collect(classifierOutput); >>> } >>> >>> @Override >>> public void open(Configuration config) { >>> ValueStateDescriptor<MultiStorePacketState> descriptor = >>> new ValueStateDescriptor<MultiStorePacketState>( >>> "MultiStorePacketState", // the state name >>> TypeInformation.of(new TypeHint<MultiStorePacketState>() >>> {}), // type information >>> new MultiStorePacketState()); // default value of the >>> state, if nothing was set >>> state = getRuntimeContext().getState(descriptor); >>> StorePacketConfigurationParameters.init(); >>> } >>> >>> @Override >>> public void onTimer(long timestamp, OnTimerContext ctx, >>> Collector<ClassifierOutput> collector) throws Exception { >>> MultiStorePacketState so = state.value(); >>> if (so.currentFile != null) { >>> closeFile(collector, so); >>> } >>> >>> ctx.timerService().deleteProcessingTimeTimer(so.timerValue); >>> state.update(so); >>> } >>> >>> private void setupTimer(MultiStorePacketState so, TimerService >>> timerService) { >>> // Cancel previous timer >>> timerService.deleteProcessingTimeTimer(so.timerValue); >>> // Register new timer >>> so.timerValue = (timerService.currentProcessingTime() + >>> StorePacketConfigurationParameters.partAggregationTimeout)/1000*1000; >>> timerService.registerProcessingTimeTimer(so.timerValue); >>> } >>> } >>> >>> >>> >>> The state class looks like following: >>> >>> public class MultiStorePacketState implements Serializable { >>> private /*static*/ Logger logger = >>> LoggerFactory.getLogger(MultiStorePacketState.class); >>> public transient RandomAccessFile currentFile = null; >>> public long timerValue; >>> public String fileName; >>> public String exportedFileName; >>> public String sessionKey; >>> public long fileOffset = 0; >>> public int partNumber = 0; >>> public boolean isCoverageIncorrect = false; >>> public boolean fileIsOpened = false; >>> public ArrayList<ClassifierOutput> sessionMetadata = new ArrayList<>(); >>> >>> private void readObject(ObjectInputStream ois) >>> throws ClassNotFoundException, IOException { >>> ois.defaultReadObject(); >>> logger.info("Deserialized MultiStorePacketState: " + >>> this.toString()); >>> >>> // No need to do anything in case of empty file >>> if (fileName.isEmpty()) { >>> return; >>> } >>> currentFile = new RandomAccessFile(fileName,"rw"); >>> currentFile.seek(fileOffset); >>> } >>> } >>> >>> Input & output are Proto files , you could replace input with byte[] , >>> and remove output generation and calls to collector. >>> The test should generate & store data for a while, so at least once >>> checkpoint would be triggered. I used a checkpoint interval 5000ms on a >>> quite slow system. >>> Every data chunk is about 1k. >>> Utils.startWriteToPcap - new RandomAccessFile() >>> Utils.writeToPcap - should be replaced with currentFile.write() >>> >>> пн, 11 окт. 2021 г. в 16:50, JING ZHANG <beyond1...@gmail.com>: >>> >>> Hi Alex, >>> It is a little weird. >>> Would you please provide the program which could reproduce the problem, >>> including DataStream job code and related classes code. I need some debug >>> to find out the reason. >>> >>> Best, >>> JING ZHANG >>> >>> >>> Alex Drobinsky <alex.drobin...@gmail.com> 于2021年10月11日周一 下午5:50写道: >>> >>> Hi Jing Zhang, >>> >>> I'm using the FileSystem backend. I also implemented ReadObject function >>> to support proper restart procedure: >>> >>> private void readObject(ObjectInputStream ois) >>> throws ClassNotFoundException, IOException { >>> ois.defaultReadObject(); >>> logger.info("Deserialized MultiStorePacketState: " + this.toString()); >>> >>> // No need to do anything in case of empty file >>> if (fileName.isEmpty()) { >>> return; >>> } >>> currentFile = new RandomAccessFile(fileName,"rw"); >>> currentFile.seek(fileOffset); >>> } >>> >>> However, according to logs this function wasn't called. >>> >>> Btw, it could be beneficial to add this kind of State object e.g. FileState >>> which will encapsulate serialization / deserialization for RandomAccessFile >>> although the concept itself is a bit contradictory to regular state. >>> >>> Currently, I implemented and tested a workaround via addition of the >>> boolean variable isFileOpened, however it's awkward because I need to check >>> the state of the transient variable every time I use state.value(). >>> >>> So should it be expected that transient variables in state would be >>> resetted to default values ? >>> >>> >>> пн, 11 окт. 2021 г. в 12:33, JING ZHANG <beyond1...@gmail.com>: >>> >>> Hi, Alex >>> What state backend do you choose? >>> If you choose MemoryStateBackend or FsStateBackend, `transient` keyword >>> may not have effect because MemoryStateBackend does not serialize state for >>> regular read/write accesses but keeps it as objects on the heap. >>> If you choose RocksDBStateBackend, I thought it was expected behavior >>> because RocksDBStateBackend stores all state as byte arrays in embedded >>> RocksDB instances. Therefore, it de/serializes the state of a key for every >>> read/write access. CurrentFile is null because the transient variable >>> would not be serialized by default. >>> >>> Best, >>> JING ZHANG >>> >>> >>> Alex Drobinsky <alex.drobin...@gmail.com> 于2021年10月11日周一 下午4:33写道: >>> >>> Dear flink community, >>> >>> I have following state class ( irrelevant fields removed ) >>> public class MultiStorePacketState implements Serializable { >>> >>> public transient RandomAccessFile currentFile = null; >>> public long timerValue; >>> public String fileName; >>> public String exportedFileName; >>> public String sessionKey; >>> public long fileOffset = 0; >>> >>> } >>> >>> Once in a while, currentFile became *nullified, *this happens after I >>> extract state via >>> >>> MultiStorePacketState so = state.value(); >>> >>> The frequency of this behaviour is similar to checkpoint interval ( >>> checkpoint interval defined as 5 seconds and first occurence of this >>> problem is also 5 seconds), otherwise I don't have any clues to a possible >>> explanation. >>> >>> Is it an expected side effect of checkpoint procedure ? >>> >>> Best regards, >>> Alex >>> >>>