For posterity: the problem was FileStreamSourceLog class. I needed to
overwrite method shouldRetain, that by default returns true and its doc say:
Default implementation retains all log entries. Implementations should
override the method to change the behavior.

--
Kind regards/ Pozdrawiam,
Wojciech Indyk


sob., 30 kwi 2022 o 12:35 Wojciech Indyk <wojciechin...@gmail.com>
napisał(a):

> Hi Gourav!
> I use stateless processing, no watermarking, no aggregations.
> I don't want any data loss, so changing checkpoint location is not an
> option to me.
>
> --
> Kind regards/ Pozdrawiam,
> Wojciech Indyk
>
>
> pt., 29 kwi 2022 o 11:07 Gourav Sengupta <gourav.sengu...@gmail.com>
> napisał(a):
>
>> Hi,
>>
>> this may not solve the problem, but have you tried to stop the job
>> gracefully, and then restart without much delay by pointing to a new
>> checkpoint location? The approach will have certain uncertainties for
>> scenarios where the source system can loose data, or we do not expect
>> duplicates to be committed, etc.
>>
>> It will be good to know what kind of processing you are doing as well.
>>
>>
>> Regards,
>> Gourav
>>
>> On Fri, Apr 29, 2022 at 8:11 AM Wojciech Indyk <wojciechin...@gmail.com>
>> wrote:
>>
>>> Update for the scenario of deleting compact files: it recovers from the
>>> recent (not compacted) checkpoint file, but when it comes to compaction of
>>> checkpoint then it fails with missing recent compaction file. I use Spark
>>> 3.1.2
>>>
>>> --
>>> Kind regards/ Pozdrawiam,
>>> Wojciech Indyk
>>>
>>>
>>> pt., 29 kwi 2022 o 07:00 Wojciech Indyk <wojciechin...@gmail.com>
>>> napisał(a):
>>>
>>>> Hello!
>>>> I use spark struture streaming. I need to use s3 for storing checkpoint
>>>> metadata (I know, it's not optimal storage for checkpoint metadata).
>>>> Compaction interval is 10 (default) and I set
>>>> "spark.sql.streaming.minBatchesToRetain"=5. When the job was running for a
>>>> few weeks then checkpointing time increased significantly (cause a few
>>>> minutes dalay on processing). I looked at checkpoint metadata structure.
>>>> There is one heavy path there: checkpoint/source/0. Single .compact file
>>>> weights 25GB. I looked into its content and it contains all entries since
>>>> batch 0 (current batch is around 25000). I tried a few parameters to remove
>>>> already processed data from the compact file, namely:
>>>> "spark.cleaner.referenceTracking.cleanCheckpoints"=true - does not
>>>> work. As I've seen in the code it's related to previous version of
>>>> streaming, isn't it?
>>>> "spark.sql.streaming.fileSource.log.deletion"=true and
>>>> "spark.sql.streaming.fileSink.log.deletion"=true doesn't work
>>>> The compact file store full history even if all data were processed
>>>> (except for the most recent checkpoint), so I expect most of entries would
>>>> be deleted. Is there any parameter to remove entries from compact file or
>>>> remove compact file gracefully from time to time? Now I am testing scenario
>>>> when I stop the job, delete most of checkpoint/source/0/* files, keeping
>>>> just a few recent checkpoints (not compacted) and I rerun the job. The job
>>>> recovers correctly from recent checkpoint. It looks like possible
>>>> workaround of my problem, but this scenario with manual delete of
>>>> checkpoint files looks ugly, so I would prefer something managed by Spark.
>>>>
>>>> --
>>>> Kind regards/ Pozdrawiam,
>>>> Wojciech Indyk
>>>>
>>>

Reply via email to