For posterity: the problem was FileStreamSourceLog class. I needed to overwrite method shouldRetain, that by default returns true and its doc say: Default implementation retains all log entries. Implementations should override the method to change the behavior.
-- Kind regards/ Pozdrawiam, Wojciech Indyk sob., 30 kwi 2022 o 12:35 Wojciech Indyk <wojciechin...@gmail.com> napisał(a): > Hi Gourav! > I use stateless processing, no watermarking, no aggregations. > I don't want any data loss, so changing checkpoint location is not an > option to me. > > -- > Kind regards/ Pozdrawiam, > Wojciech Indyk > > > pt., 29 kwi 2022 o 11:07 Gourav Sengupta <gourav.sengu...@gmail.com> > napisał(a): > >> Hi, >> >> this may not solve the problem, but have you tried to stop the job >> gracefully, and then restart without much delay by pointing to a new >> checkpoint location? The approach will have certain uncertainties for >> scenarios where the source system can loose data, or we do not expect >> duplicates to be committed, etc. >> >> It will be good to know what kind of processing you are doing as well. >> >> >> Regards, >> Gourav >> >> On Fri, Apr 29, 2022 at 8:11 AM Wojciech Indyk <wojciechin...@gmail.com> >> wrote: >> >>> Update for the scenario of deleting compact files: it recovers from the >>> recent (not compacted) checkpoint file, but when it comes to compaction of >>> checkpoint then it fails with missing recent compaction file. I use Spark >>> 3.1.2 >>> >>> -- >>> Kind regards/ Pozdrawiam, >>> Wojciech Indyk >>> >>> >>> pt., 29 kwi 2022 o 07:00 Wojciech Indyk <wojciechin...@gmail.com> >>> napisał(a): >>> >>>> Hello! >>>> I use spark struture streaming. I need to use s3 for storing checkpoint >>>> metadata (I know, it's not optimal storage for checkpoint metadata). >>>> Compaction interval is 10 (default) and I set >>>> "spark.sql.streaming.minBatchesToRetain"=5. When the job was running for a >>>> few weeks then checkpointing time increased significantly (cause a few >>>> minutes dalay on processing). I looked at checkpoint metadata structure. >>>> There is one heavy path there: checkpoint/source/0. Single .compact file >>>> weights 25GB. I looked into its content and it contains all entries since >>>> batch 0 (current batch is around 25000). I tried a few parameters to remove >>>> already processed data from the compact file, namely: >>>> "spark.cleaner.referenceTracking.cleanCheckpoints"=true - does not >>>> work. As I've seen in the code it's related to previous version of >>>> streaming, isn't it? >>>> "spark.sql.streaming.fileSource.log.deletion"=true and >>>> "spark.sql.streaming.fileSink.log.deletion"=true doesn't work >>>> The compact file store full history even if all data were processed >>>> (except for the most recent checkpoint), so I expect most of entries would >>>> be deleted. Is there any parameter to remove entries from compact file or >>>> remove compact file gracefully from time to time? Now I am testing scenario >>>> when I stop the job, delete most of checkpoint/source/0/* files, keeping >>>> just a few recent checkpoints (not compacted) and I rerun the job. The job >>>> recovers correctly from recent checkpoint. It looks like possible >>>> workaround of my problem, but this scenario with manual delete of >>>> checkpoint files looks ugly, so I would prefer something managed by Spark. >>>> >>>> -- >>>> Kind regards/ Pozdrawiam, >>>> Wojciech Indyk >>>> >>>