Hi Arvid, It sounds like a good direction, do I need to register my state class with KryoSerializer , similar to this ?
env.getConfig().registerTypeWithKryoSerializer(IPSessionOrganizer.proto.SourceOutput.class, ProtobufSerializer.class); пн, 18 окт. 2021 г. в 10:32, Arvid Heise <ar...@apache.org>: > Hi Alex, > > could you also log the identifity hashcode (or something similar) of the > related instance? I suspect that it's not the field that is set to null but > that you get a clone where the field is null. In that case, you need to add > a specific KryoSerializer to initialize it (or just go with a lazy access > pattern all the way). > > On Tue, Oct 12, 2021 at 2:55 PM Alex Drobinsky <alex.drobin...@gmail.com> > wrote: > >> Hi Jing, >> >> Job doesn't restart from the checkpoint, it's a brand new clean job , no >> exceptions happened during execution, no restarts :) >> The state is a Keyed State so a new key means a new State - in this >> situation a currentFile is equal to null - as expected and handled without >> issues. >> Before I even thought to inquire about my questions, the first thing I >> did - I added log messages with the value of currentFile in any place it >> could be changed. >> So I checked that before I release my control to Flink, currentFile has >> the correct value and after I receive value from state in the next >> iteration it's set to null. >> The checkpoints by themselves could be irrelevant to the problem, the >> only indication of connection is my assumption based on observation that >> the interval between first event and first occurrence of nullification is >> exactly the same as the checkpoint interval. >> >> Yun Tang - you are correct, it's a KryoSerializer, if I remove the >> "transient" qualifier from currentFile, it crashes inside of KryoSerializer >> because RandomAccessFile isn't serializable. >> Which also points to the fact that at least once serialization was >> actually executed. I will try an alternative approach - I will add my own >> writeObject implementation, it should work :) >> >> Best regards, >> Alex >> >> >> >> >> >> >> вт, 12 окт. 2021 г. в 15:07, JING ZHANG <beyond1...@gmail.com>: >> >>> Hi Alex, >>> Since you use `FileSystemStateBackend`, I think currentFile became >>> nullified once in a while is not caused by period checkpoint. >>> >>> Because if job is running without failover or restore from checkpoint, >>> read/write value state on `FileSystemStateBackend` does not cause >>> serialization and deserialization at all. I have already simplify your >>> coding and verify this point. If you are interested, you can find this >>> simplified code in the attachment of the email. >>> >>> There are some possible reasons come to my mind, I hope this helps. >>> 1. Does job restore from checkpoint/savepoint? This may caused by >>> failover or user trigger stop-with-savepoint. >>> 2. If job does not restore from checkpoint or savepoint. >>> 2.1 After read the MultiStorePacketState from ValueState, is there >>> somewhere in your program to update the currentFile field to null again? >>> Because the state stored in heap, it may be changed if program updates its >>> value somewhere. >>> 2.2 When the currentFile became null, is there any possible that >>> current key never appear before? that is it's the first time that the >>> current key appears, so get state would return default value(a new >>> MultiStorePacketState instance with null currentFile) >>> >>> Best, >>> JING ZHANG >>> >>> Yun Tang <myas...@live.com> 于2021年10月12日周二 下午4:41写道: >>> >>>> Hi Alex, >>>> >>>> Since you use customized MultiStorePacketState class as the value state >>>> type, it should use kryo serializer [1] to serialize your class via >>>> accessing RocksDB state or checkpoint via FileSystemStateBackend, and I >>>> don't know whether Kryo would serialize your transient field. >>>> If you're not familiar with Flink's serialization stack, I think you >>>> could check behaviors below: >>>> >>>> 1. Without any checkpoint restore, use FileSystemStateBackend to >>>> see whether the transient field could be read as expected, the answer >>>> should be yes. >>>> 2. After restoring from checkpoint, check whether could read the >>>> transient field back if using FileSystemStateBackend. >>>> >>>> >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/serialization/types_serialization/#flinks-typeinformation-class >>>> >>>> Best >>>> Yun Tang >>>> >>>> >>>> ------------------------------ >>>> *From:* Alex Drobinsky <alex.drobin...@gmail.com> >>>> *Sent:* Monday, October 11, 2021 22:37 >>>> *To:* JING ZHANG <beyond1...@gmail.com> >>>> *Cc:* User-Flink <user@flink.apache.org> >>>> *Subject:* Re: Reset of transient variables in state to default values. >>>> >>>> It would be difficult to provide even a semblance of the complete >>>> product , however I could try to provide enough details to reproduce the >>>> problem. >>>> Standard source would do: >>>> >>>> DataStream<byte[]> stream = env.addSource( >>>> new FlinkKafkaConsumer<>(topic, new >>>> AbstractDeserializationSchema<byte[]>() { >>>> @Override >>>> public byte[] deserialize(byte[] bytes) throws IOException { >>>> return bytes; >>>> } >>>> }, properties)).name(topic); >>>> >>>> >>>> The operator body something like: >>>> >>>> public class MultiStorePacketFunction extends KeyedProcessFunction<String, >>>> SplitterToMultiStore, ClassifierOutput> implements Serializable { >>>> private transient ValueState<MultiStorePacketState> state; >>>> >>>> @Override >>>> public void processElement(SplitterToMultiStore packet, Context ctx, >>>> Collector<ClassifierOutput> out) throws Exception { >>>> if (packet.hasPackets()) { >>>> storedPackets.inc(packet.getPackets().getPacketsCount()); >>>> } >>>> >>>> MultiStorePacketState so = state.value(); >>>> if (process(packet, out, so, ctx)) { >>>> state.update(null); >>>> state.clear(); >>>> } else { >>>> state.update(so); >>>> } >>>> } >>>> >>>> public String generateNextFilename(String sessionKey, int partNumber) { >>>> String path = DirectoryService.getInstance().bookDirectory(); >>>> return path + File.separator + sessionKey + "-" + partNumber + >>>> ".pcap"; >>>> } >>>> >>>> private void storeContent(Collector<ClassifierOutput> collector, >>>> MultiStorePacketState state, SplitterToMultiStore packets) throws >>>> Exception { >>>> assert (packets != null); >>>> assert (packets.hasPackets()); >>>> >>>> if ( state.currentFile == null) { >>>> openFile(collector, state, packets); >>>> } >>>> >>>> Utils.continueWriteToPcap(state.currentFile, >>>> packets.getPackets().getPacketsList()); >>>> state.fileOffset = state.currentFile.length(); >>>> >>>> tryToCloseFile(collector, state); >>>> } >>>> >>>> static public String extractExportedFileName(String fileName) { >>>> String path[] = fileName.split("/"); >>>> return path[path.length - 2] + "/" + path[path.length - 1]; >>>> } >>>> >>>> private void openFile(Collector<ClassifierOutput> collector, >>>> MultiStorePacketState state, SplitterToMultiStore packets) throws >>>> Exception { >>>> state.fileIsOpened = true; >>>> state.fileName = generateNextFilename(state.sessionKey, >>>> state.partNumber); >>>> state.exportedFileName = extractExportedFileName(state.fileName); >>>> >>>> // -> Here RandomAccessFile created >>>> state.currentFile = Utils.startWriteToPcap(state.fileName, >>>> packets.getPackets().getPacketsList()); >>>> state.fileOffset = state.currentFile.length(); >>>> state.partNumber++; >>>> } >>>> >>>> private void tryToCloseFile(Collector<ClassifierOutput> collector, >>>> MultiStorePacketState state) throws IOException { >>>> if (state.currentFile.length() < >>>> StorePacketConfigurationParameters.partSizeLimit) { >>>> return; >>>> } >>>> closeFile(collector, state); >>>> } >>>> >>>> private void closeFile(Collector<ClassifierOutput> collector, >>>> MultiStorePacketState state) throws IOException { >>>> state.currentFile.close(); >>>> state.currentFile = null; >>>> state.fileIsOpened = false; >>>> ClassifierOutput.Builder outputBuilder = >>>> ClassifierOutput.newBuilder(); >>>> outputBuilder.getUsualBuilder().setFileName(state.exportedFileName); >>>> outputBuilder.setSessionType(SessionType.Multi); >>>> outputBuilder.setSessionKey(state.sessionKey); >>>> var classifierOutput = outputBuilder.build(); >>>> state.sessionMetadata.add(classifierOutput); >>>> collector.collect(classifierOutput); >>>> } >>>> >>>> public boolean process(SplitterToMultiStore packet, >>>> Collector<ClassifierOutput> collector, MultiStorePacketState so, Context >>>> context) throws Exception { >>>> >>>> // First message >>>> if (packet.hasClassificationResult()) { >>>> sendClassificationResult(packet, collector, so); >>>> return false; >>>> } >>>> >>>> // Last message >>>> if (packet.hasSessionClosure()) { >>>> if (so.isCoverageIncorrect) { >>>> return true; >>>> } >>>> handleSessionClosure(packet, collector, so, context); >>>> return true; >>>> } >>>> >>>> if (so.isCoverageIncorrect) { >>>> return false; >>>> } >>>> storeContent(collector, so, packet); >>>> >>>> // File could be already close e.g. it reached expected size. >>>> if (so.currentFile != null) { >>>> setupTimer(so, context.timerService()); >>>> } >>>> return false; >>>> } >>>> >>>> private void handleSessionClosure(SplitterToMultiStore packet, >>>> Collector<ClassifierOutput> collector, MultiStorePacketState so, Context >>>> context) throws IOException { >>>> if (so.currentFile != null) { >>>> closeFile(collector, so); >>>> } >>>> ClassifierOutput.Builder outputBuilder = >>>> ClassifierOutput.newBuilder(); >>>> outputBuilder.setSessionKey(packet.getSessionKey()); >>>> outputBuilder.setSessionType(packet.getSessionType()); >>>> var messageBuilder = outputBuilder.getLastBuilder(); >>>> messageBuilder.addAllAggregatedSession(so.sessionMetadata); >>>> outputBuilder.setLast(messageBuilder.build()); >>>> var output = outputBuilder.build(); >>>> collector.collect(output); >>>> context.timerService().deleteProcessingTimeTimer(so.timerValue); >>>> state.clear(); >>>> } >>>> >>>> private void sendClassificationResult(SplitterToMultiStore packet, >>>> Collector<ClassifierOutput> collector, MultiStorePacketState so) { >>>> var coverageResult = >>>> CoverageChecker.obtainCoverageInfo(packet.getClassificationResult().getLIID()); >>>> so.isCoverageIncorrect = !coverageResult.getValue0(); >>>> if (so.isCoverageIncorrect) { >>>> return; >>>> } >>>> >>>> ClassifierOutput.Builder outputBuilder = >>>> ClassifierOutput.newBuilder(); >>>> >>>> outputBuilder.getFirstBuilder().setClassificationResult(packet.getClassificationResult().getClassificationResult()); >>>> >>>> outputBuilder.getFirstBuilder().setLIID(packet.getClassificationResult().getLIID()); >>>> >>>> outputBuilder.getFirstBuilder().setCIN(packet.getClassificationResult().getCIN()); >>>> >>>> outputBuilder.getFirstBuilder().setOperatorId(packet.getClassificationResult().getOperatorId()); >>>> >>>> outputBuilder.getFirstBuilder().setCoverage(coverageResult.getValue1()); >>>> outputBuilder.setSessionKey(packet.getSessionKey()); >>>> outputBuilder.setSessionType(packet.getSessionType()); >>>> so.sessionKey = packet.getSessionKey(); >>>> var classifierOutput = outputBuilder.build(); >>>> so.sessionMetadata.add(classifierOutput); >>>> collector.collect(classifierOutput); >>>> } >>>> >>>> @Override >>>> public void open(Configuration config) { >>>> ValueStateDescriptor<MultiStorePacketState> descriptor = >>>> new ValueStateDescriptor<MultiStorePacketState>( >>>> "MultiStorePacketState", // the state name >>>> TypeInformation.of(new TypeHint<MultiStorePacketState>() >>>> {}), // type information >>>> new MultiStorePacketState()); // default value of the >>>> state, if nothing was set >>>> state = getRuntimeContext().getState(descriptor); >>>> StorePacketConfigurationParameters.init(); >>>> } >>>> >>>> @Override >>>> public void onTimer(long timestamp, OnTimerContext ctx, >>>> Collector<ClassifierOutput> collector) throws Exception { >>>> MultiStorePacketState so = state.value(); >>>> if (so.currentFile != null) { >>>> closeFile(collector, so); >>>> } >>>> >>>> ctx.timerService().deleteProcessingTimeTimer(so.timerValue); >>>> state.update(so); >>>> } >>>> >>>> private void setupTimer(MultiStorePacketState so, TimerService >>>> timerService) { >>>> // Cancel previous timer >>>> timerService.deleteProcessingTimeTimer(so.timerValue); >>>> // Register new timer >>>> so.timerValue = (timerService.currentProcessingTime() + >>>> StorePacketConfigurationParameters.partAggregationTimeout)/1000*1000; >>>> timerService.registerProcessingTimeTimer(so.timerValue); >>>> } >>>> } >>>> >>>> >>>> >>>> The state class looks like following: >>>> >>>> public class MultiStorePacketState implements Serializable { >>>> private /*static*/ Logger logger = >>>> LoggerFactory.getLogger(MultiStorePacketState.class); >>>> public transient RandomAccessFile currentFile = null; >>>> public long timerValue; >>>> public String fileName; >>>> public String exportedFileName; >>>> public String sessionKey; >>>> public long fileOffset = 0; >>>> public int partNumber = 0; >>>> public boolean isCoverageIncorrect = false; >>>> public boolean fileIsOpened = false; >>>> public ArrayList<ClassifierOutput> sessionMetadata = new ArrayList<>(); >>>> >>>> private void readObject(ObjectInputStream ois) >>>> throws ClassNotFoundException, IOException { >>>> ois.defaultReadObject(); >>>> logger.info("Deserialized MultiStorePacketState: " + >>>> this.toString()); >>>> >>>> // No need to do anything in case of empty file >>>> if (fileName.isEmpty()) { >>>> return; >>>> } >>>> currentFile = new RandomAccessFile(fileName,"rw"); >>>> currentFile.seek(fileOffset); >>>> } >>>> } >>>> >>>> Input & output are Proto files , you could replace input with byte[] , >>>> and remove output generation and calls to collector. >>>> The test should generate & store data for a while, so at least once >>>> checkpoint would be triggered. I used a checkpoint interval 5000ms on a >>>> quite slow system. >>>> Every data chunk is about 1k. >>>> Utils.startWriteToPcap - new RandomAccessFile() >>>> Utils.writeToPcap - should be replaced with currentFile.write() >>>> >>>> пн, 11 окт. 2021 г. в 16:50, JING ZHANG <beyond1...@gmail.com>: >>>> >>>> Hi Alex, >>>> It is a little weird. >>>> Would you please provide the program which could reproduce the problem, >>>> including DataStream job code and related classes code. I need some debug >>>> to find out the reason. >>>> >>>> Best, >>>> JING ZHANG >>>> >>>> >>>> Alex Drobinsky <alex.drobin...@gmail.com> 于2021年10月11日周一 下午5:50写道: >>>> >>>> Hi Jing Zhang, >>>> >>>> I'm using the FileSystem backend. I also implemented ReadObject >>>> function to support proper restart procedure: >>>> >>>> private void readObject(ObjectInputStream ois) >>>> throws ClassNotFoundException, IOException { >>>> ois.defaultReadObject(); >>>> logger.info("Deserialized MultiStorePacketState: " + this.toString()); >>>> >>>> // No need to do anything in case of empty file >>>> if (fileName.isEmpty()) { >>>> return; >>>> } >>>> currentFile = new RandomAccessFile(fileName,"rw"); >>>> currentFile.seek(fileOffset); >>>> } >>>> >>>> However, according to logs this function wasn't called. >>>> >>>> Btw, it could be beneficial to add this kind of State object e.g. >>>> FileState which will encapsulate serialization / deserialization for >>>> RandomAccessFile although the concept itself is a bit contradictory to >>>> regular state. >>>> >>>> Currently, I implemented and tested a workaround via addition of the >>>> boolean variable isFileOpened, however it's awkward because I need to >>>> check the state of the transient variable every time I use state.value(). >>>> >>>> So should it be expected that transient variables in state would be >>>> resetted to default values ? >>>> >>>> >>>> пн, 11 окт. 2021 г. в 12:33, JING ZHANG <beyond1...@gmail.com>: >>>> >>>> Hi, Alex >>>> What state backend do you choose? >>>> If you choose MemoryStateBackend or FsStateBackend, `transient` keyword >>>> may not have effect because MemoryStateBackend does not serialize state for >>>> regular read/write accesses but keeps it as objects on the heap. >>>> If you choose RocksDBStateBackend, I thought it was expected behavior >>>> because RocksDBStateBackend stores all state as byte arrays in embedded >>>> RocksDB instances. Therefore, it de/serializes the state of a key for every >>>> read/write access. CurrentFile is null because the transient variable >>>> would not be serialized by default. >>>> >>>> Best, >>>> JING ZHANG >>>> >>>> >>>> Alex Drobinsky <alex.drobin...@gmail.com> 于2021年10月11日周一 下午4:33写道: >>>> >>>> Dear flink community, >>>> >>>> I have following state class ( irrelevant fields removed ) >>>> public class MultiStorePacketState implements Serializable { >>>> >>>> public transient RandomAccessFile currentFile = null; >>>> public long timerValue; >>>> public String fileName; >>>> public String exportedFileName; >>>> public String sessionKey; >>>> public long fileOffset = 0; >>>> >>>> } >>>> >>>> Once in a while, currentFile became *nullified, *this happens after I >>>> extract state via >>>> >>>> MultiStorePacketState so = state.value(); >>>> >>>> The frequency of this behaviour is similar to checkpoint interval ( >>>> checkpoint interval defined as 5 seconds and first occurence of this >>>> problem is also 5 seconds), otherwise I don't have any clues to a possible >>>> explanation. >>>> >>>> Is it an expected side effect of checkpoint procedure ? >>>> >>>> Best regards, >>>> Alex >>>> >>>>