It would be difficult to provide even a semblance of the complete product ,
however I could try to provide enough details to reproduce the problem.
Standard source would do:

DataStream<byte[]> stream = env.addSource(
        new FlinkKafkaConsumer<>(topic, new
AbstractDeserializationSchema<byte[]>() {
            @Override
            public byte[] deserialize(byte[] bytes) throws IOException {
                return bytes;
            }
        }, properties)).name(topic);


The operator body something like:

public class MultiStorePacketFunction extends
KeyedProcessFunction<String, SplitterToMultiStore, ClassifierOutput>
implements Serializable {
   private transient ValueState<MultiStorePacketState> state;

   @Override
   public void processElement(SplitterToMultiStore packet, Context
ctx, Collector<ClassifierOutput> out) throws Exception {
      if (packet.hasPackets()) {
         storedPackets.inc(packet.getPackets().getPacketsCount());
      }

      MultiStorePacketState so = state.value();
      if (process(packet, out, so, ctx)) {
         state.update(null);
         state.clear();
      } else {
         state.update(so);
      }
   }

    public String generateNextFilename(String sessionKey, int partNumber) {
      String path = DirectoryService.getInstance().bookDirectory();
      return path + File.separator + sessionKey + "-" + partNumber + ".pcap";
   }

   private void storeContent(Collector<ClassifierOutput> collector,
MultiStorePacketState state, SplitterToMultiStore packets) throws
Exception {
      assert (packets != null);
      assert (packets.hasPackets());

      if ( state.currentFile == null) {
         openFile(collector, state, packets);
      }

      Utils.continueWriteToPcap(state.currentFile,
packets.getPackets().getPacketsList());
      state.fileOffset = state.currentFile.length();

      tryToCloseFile(collector, state);
   }

   static public String extractExportedFileName(String fileName) {
      String path[] = fileName.split("/");
      return path[path.length - 2] + "/" + path[path.length - 1];
   }

   private void openFile(Collector<ClassifierOutput> collector,
MultiStorePacketState state, SplitterToMultiStore packets) throws
Exception {
      state.fileIsOpened = true;
      state.fileName = generateNextFilename(state.sessionKey, state.partNumber);
      state.exportedFileName = extractExportedFileName(state.fileName);

// -> Here RandomAccessFile created
      state.currentFile = Utils.startWriteToPcap(state.fileName,
packets.getPackets().getPacketsList());
      state.fileOffset = state.currentFile.length();
      state.partNumber++;
   }

   private void tryToCloseFile(Collector<ClassifierOutput> collector,
MultiStorePacketState state) throws IOException {
      if (state.currentFile.length() <
StorePacketConfigurationParameters.partSizeLimit) {
         return;
      }
      closeFile(collector, state);
   }

   private void closeFile(Collector<ClassifierOutput> collector,
MultiStorePacketState state) throws IOException {
      state.currentFile.close();
      state.currentFile = null;
      state.fileIsOpened = false;
      ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder();
      outputBuilder.getUsualBuilder().setFileName(state.exportedFileName);
      outputBuilder.setSessionType(SessionType.Multi);
      outputBuilder.setSessionKey(state.sessionKey);
      var classifierOutput = outputBuilder.build();
      state.sessionMetadata.add(classifierOutput);
      collector.collect(classifierOutput);
   }

    public boolean process(SplitterToMultiStore packet,
Collector<ClassifierOutput> collector, MultiStorePacketState so,
Context context) throws Exception {

      // First message
      if (packet.hasClassificationResult()) {
         sendClassificationResult(packet, collector, so);
         return false;
      }

      // Last message
      if (packet.hasSessionClosure()) {
         if (so.isCoverageIncorrect) {
            return true;
         }
         handleSessionClosure(packet, collector, so, context);
         return true;
      }

      if (so.isCoverageIncorrect) {
         return false;
      }
      storeContent(collector, so, packet);

      // File could be already close e.g. it reached expected size.
      if (so.currentFile != null) {
         setupTimer(so, context.timerService());
      }
      return false;
   }

   private void handleSessionClosure(SplitterToMultiStore packet,
Collector<ClassifierOutput> collector, MultiStorePacketState so,
Context context) throws IOException {
      if (so.currentFile != null) {
         closeFile(collector, so);
      }
      ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder();
      outputBuilder.setSessionKey(packet.getSessionKey());
      outputBuilder.setSessionType(packet.getSessionType());
      var messageBuilder = outputBuilder.getLastBuilder();
      messageBuilder.addAllAggregatedSession(so.sessionMetadata);
      outputBuilder.setLast(messageBuilder.build());
      var output = outputBuilder.build();
      collector.collect(output);
      context.timerService().deleteProcessingTimeTimer(so.timerValue);
      state.clear();
   }

   private void sendClassificationResult(SplitterToMultiStore packet,
Collector<ClassifierOutput> collector, MultiStorePacketState so) {
      var coverageResult =
CoverageChecker.obtainCoverageInfo(packet.getClassificationResult().getLIID());
      so.isCoverageIncorrect = !coverageResult.getValue0();
      if (so.isCoverageIncorrect) {
         return;
      }

      ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder();
      
outputBuilder.getFirstBuilder().setClassificationResult(packet.getClassificationResult().getClassificationResult());
      
outputBuilder.getFirstBuilder().setLIID(packet.getClassificationResult().getLIID());
      
outputBuilder.getFirstBuilder().setCIN(packet.getClassificationResult().getCIN());
      
outputBuilder.getFirstBuilder().setOperatorId(packet.getClassificationResult().getOperatorId());
      outputBuilder.getFirstBuilder().setCoverage(coverageResult.getValue1());
      outputBuilder.setSessionKey(packet.getSessionKey());
      outputBuilder.setSessionType(packet.getSessionType());
      so.sessionKey = packet.getSessionKey();
      var classifierOutput = outputBuilder.build();
      so.sessionMetadata.add(classifierOutput);
      collector.collect(classifierOutput);
   }

    @Override
   public void open(Configuration config) {
      ValueStateDescriptor<MultiStorePacketState> descriptor =
            new ValueStateDescriptor<MultiStorePacketState>(
                  "MultiStorePacketState", // the state name
                  TypeInformation.of(new
TypeHint<MultiStorePacketState>() {}), // type information
                  new MultiStorePacketState()); // default value of
the state, if nothing was set
      state = getRuntimeContext().getState(descriptor);
      StorePacketConfigurationParameters.init();
   }

   @Override
   public void onTimer(long timestamp, OnTimerContext ctx,
Collector<ClassifierOutput> collector) throws Exception {
      MultiStorePacketState so = state.value();
      if (so.currentFile != null) {
         closeFile(collector, so);
      }

      ctx.timerService().deleteProcessingTimeTimer(so.timerValue);
      state.update(so);
   }

   private void setupTimer(MultiStorePacketState so, TimerService
timerService) {
      // Cancel previous timer
      timerService.deleteProcessingTimeTimer(so.timerValue);
      // Register new timer
      so.timerValue = (timerService.currentProcessingTime() +
StorePacketConfigurationParameters.partAggregationTimeout)/1000*1000;
      timerService.registerProcessingTimeTimer(so.timerValue);
   }
}



The state class looks like following:

public class MultiStorePacketState implements Serializable {
    private /*static*/ Logger logger =
LoggerFactory.getLogger(MultiStorePacketState.class);
    public transient RandomAccessFile currentFile = null;
    public long timerValue;
    public String fileName;
    public String exportedFileName;
    public String sessionKey;
    public long fileOffset = 0;
    public int partNumber = 0;
    public boolean isCoverageIncorrect = false;
    public boolean fileIsOpened = false;
    public ArrayList<ClassifierOutput> sessionMetadata = new ArrayList<>();

    private void readObject(ObjectInputStream ois)
            throws ClassNotFoundException, IOException {
        ois.defaultReadObject();
        logger.info("Deserialized MultiStorePacketState: " + this.toString());

        // No need to do anything in case of empty file
        if (fileName.isEmpty()) {
            return;
        }
        currentFile = new RandomAccessFile(fileName,"rw");
        currentFile.seek(fileOffset);
    }
}

Input & output are Proto files , you could replace input with byte[] , and
remove output generation and calls to collector.
The test should generate & store data for a while, so at least once
checkpoint would be triggered. I used a checkpoint interval 5000ms on a
quite slow system.
Every data chunk is about 1k.
Utils.startWriteToPcap - new RandomAccessFile()
Utils.writeToPcap - should be replaced with currentFile.write()

пн, 11 окт. 2021 г. в 16:50, JING ZHANG <beyond1...@gmail.com>:

> Hi Alex,
> It is a little weird.
> Would you please provide the program which could reproduce the problem,
> including DataStream job code and related classes code. I need some debug
> to find out the reason.
>
> Best,
> JING ZHANG
>
>
> Alex Drobinsky <alex.drobin...@gmail.com> 于2021年10月11日周一 下午5:50写道:
>
>> Hi Jing Zhang,
>>
>> I'm using the FileSystem backend. I also implemented ReadObject function
>> to support proper restart procedure:
>>
>> private void readObject(ObjectInputStream ois)
>>         throws ClassNotFoundException, IOException {
>>     ois.defaultReadObject();
>>     logger.info("Deserialized MultiStorePacketState: " + this.toString());
>>
>>     // No need to do anything in case of empty file
>>     if (fileName.isEmpty()) {
>>         return;
>>     }
>>     currentFile = new RandomAccessFile(fileName,"rw");
>>     currentFile.seek(fileOffset);
>> }
>>
>> However, according to logs this function wasn't called.
>>
>> Btw, it could be beneficial to add this kind of State object e.g. FileState 
>> which will encapsulate serialization / deserialization for RandomAccessFile 
>> although the concept itself is a bit contradictory to regular state.
>>
>> Currently, I implemented and tested a workaround via addition of the boolean 
>> variable isFileOpened, however it's awkward because I need to check the 
>> state of the transient variable every time I use state.value().
>>
>> So should it be expected that transient variables in state would be resetted 
>> to default values ?
>>
>>
>> пн, 11 окт. 2021 г. в 12:33, JING ZHANG <beyond1...@gmail.com>:
>>
>>> Hi, Alex
>>> What state backend do you choose?
>>> If you choose MemoryStateBackend or FsStateBackend, `transient` keyword
>>> may not have effect because MemoryStateBackend does not serialize state for
>>> regular read/write accesses but keeps it as objects on the heap.
>>> If you choose RocksDBStateBackend, I thought it was expected behavior
>>> because RocksDBStateBackend stores all state as byte arrays in embedded
>>> RocksDB instances. Therefore, it de/serializes the state of a key for every
>>> read/write access.  CurrentFile is null because the transient variable
>>> would not be serialized by default.
>>>
>>> Best,
>>> JING ZHANG
>>>
>>>
>>> Alex Drobinsky <alex.drobin...@gmail.com> 于2021年10月11日周一 下午4:33写道:
>>>
>>>> Dear flink community,
>>>>
>>>> I have following state class ( irrelevant fields removed )
>>>> public class MultiStorePacketState implements Serializable {
>>>>
>>>>     public transient RandomAccessFile currentFile = null;
>>>>     public long timerValue;
>>>>     public String fileName;
>>>>     public String exportedFileName;
>>>>     public String sessionKey;
>>>>     public long fileOffset = 0;
>>>>
>>>> }
>>>>
>>>> Once in a while, currentFile became *nullified, *this happens after I
>>>> extract state via
>>>>
>>>> MultiStorePacketState so = state.value();
>>>>
>>>> The frequency of this behaviour is similar to checkpoint interval ( 
>>>> checkpoint interval defined as 5 seconds and first occurence of this 
>>>> problem is also 5 seconds), otherwise I don't have any clues to a possible 
>>>> explanation.
>>>>
>>>> Is it an expected side effect of checkpoint procedure ?
>>>>
>>>> Best regards,
>>>> Alex
>>>>
>>>>

Reply via email to