Hi!

Currently Flink File Source relies on a Set<Path> pathsAlreadyProcessed in
SplitEnumerator to decide which file has been processed and avoids
reprocessing files if a file is already in this set. However this set could
be ever growing and ultimately exceed memory size if there are new files
continuously added to the input path.

I submitted https://issues.apache.org/jira/browse/FLINK-22792 and would
like to be assigned to the ticket.

Current proposed change as belows, would like to get an agreement on the
approach taken.

   1.

   Maintain fileWatermark updated by new files modification time in
   ContinuousFileSplitEnumerator
   2.

   Change Set<Path> pathsAlreadyProcessed to a HashMap<Path, Long>
   pathsAlreadyProcessed where the key is same as before which is the file
   path of already processed files, and the value is file modification time,
   expose getModificationTime() method to FileSourceSplit.


   1.

   Adding a fileExpireTime user configurable config, any files older
than fileWatermark
   - fileExpireTime would get ignored.
   2.

   When snapshotting splitEnumerator, remove files that are older than
fileWatermark
   - fileExpireTime from the pathsAlreadyProcessed map.
   3.

   Adding alreadyProcessedPaths map and fileWatermark to
   PendingSplitsCheckpoint, modify the current
   PendingSplitsCheckpointSerializer to add a version2 serializer that
   serialize the alreadyProcessedPaths map which included file modification
   time.
   4.

   Subclass of PendingSplitsCheckpoint like
   ContinuousHivePendingSplitsCheckpoint would not be impacted by
   initializing an empty alreadyProcessedMap and 0 as initial watermark.

Thanks!

Reply via email to