Hi Gourav! I use stateless processing, no watermarking, no aggregations. I don't want any data loss, so changing checkpoint location is not an option to me.
-- Kind regards/ Pozdrawiam, Wojciech Indyk pt., 29 kwi 2022 o 11:07 Gourav Sengupta <gourav.sengu...@gmail.com> napisał(a): > Hi, > > this may not solve the problem, but have you tried to stop the job > gracefully, and then restart without much delay by pointing to a new > checkpoint location? The approach will have certain uncertainties for > scenarios where the source system can loose data, or we do not expect > duplicates to be committed, etc. > > It will be good to know what kind of processing you are doing as well. > > > Regards, > Gourav > > On Fri, Apr 29, 2022 at 8:11 AM Wojciech Indyk <wojciechin...@gmail.com> > wrote: > >> Update for the scenario of deleting compact files: it recovers from the >> recent (not compacted) checkpoint file, but when it comes to compaction of >> checkpoint then it fails with missing recent compaction file. I use Spark >> 3.1.2 >> >> -- >> Kind regards/ Pozdrawiam, >> Wojciech Indyk >> >> >> pt., 29 kwi 2022 o 07:00 Wojciech Indyk <wojciechin...@gmail.com> >> napisał(a): >> >>> Hello! >>> I use spark struture streaming. I need to use s3 for storing checkpoint >>> metadata (I know, it's not optimal storage for checkpoint metadata). >>> Compaction interval is 10 (default) and I set >>> "spark.sql.streaming.minBatchesToRetain"=5. When the job was running for a >>> few weeks then checkpointing time increased significantly (cause a few >>> minutes dalay on processing). I looked at checkpoint metadata structure. >>> There is one heavy path there: checkpoint/source/0. Single .compact file >>> weights 25GB. I looked into its content and it contains all entries since >>> batch 0 (current batch is around 25000). I tried a few parameters to remove >>> already processed data from the compact file, namely: >>> "spark.cleaner.referenceTracking.cleanCheckpoints"=true - does not work. >>> As I've seen in the code it's related to previous version of streaming, >>> isn't it? >>> "spark.sql.streaming.fileSource.log.deletion"=true and >>> "spark.sql.streaming.fileSink.log.deletion"=true doesn't work >>> The compact file store full history even if all data were processed >>> (except for the most recent checkpoint), so I expect most of entries would >>> be deleted. Is there any parameter to remove entries from compact file or >>> remove compact file gracefully from time to time? Now I am testing scenario >>> when I stop the job, delete most of checkpoint/source/0/* files, keeping >>> just a few recent checkpoints (not compacted) and I rerun the job. The job >>> recovers correctly from recent checkpoint. It looks like possible >>> workaround of my problem, but this scenario with manual delete of >>> checkpoint files looks ugly, so I would prefer something managed by Spark. >>> >>> -- >>> Kind regards/ Pozdrawiam, >>> Wojciech Indyk >>> >>