Hello!
I use spark struture streaming. I need to use s3 for storing checkpoint
metadata (I know, it's not optimal storage for checkpoint metadata).
Compaction interval is 10 (default) and I set
"spark.sql.streaming.minBatchesToRetain"=5. When the job was running for a
few weeks then checkpointing time increased significantly (cause a few
minutes dalay on processing). I looked at checkpoint metadata structure.
There is one heavy path there: checkpoint/source/0. Single .compact file
weights 25GB. I looked into its content and it contains all entries since
batch 0 (current batch is around 25000). I tried a few parameters to remove
already processed data from the compact file, namely:
"spark.cleaner.referenceTracking.cleanCheckpoints"=true - does not work. As
I've seen in the code it's related to previous version of streaming, isn't
it?
"spark.sql.streaming.fileSource.log.deletion"=true and
"spark.sql.streaming.fileSink.log.deletion"=true doesn't work
The compact file store full history even if all data were processed (except
for the most recent checkpoint), so I expect most of entries would be
deleted. Is there any parameter to remove entries from compact file or
remove compact file gracefully from time to time? Now I am testing scenario
when I stop the job, delete most of checkpoint/source/0/* files, keeping
just a few recent checkpoints (not compacted) and I rerun the job. The job
recovers correctly from recent checkpoint. It looks like possible
workaround of my problem, but this scenario with manual delete of
checkpoint files looks ugly, so I would prefer something managed by Spark.

--
Kind regards/ Pozdrawiam,
Wojciech Indyk

Reply via email to