Hi Maciek,

Thanks for bringing this up again and sorry for not opening the discussion yet.

I will check it out and get back to you during week.

Kostas

> On Nov 26, 2016, at 9:40 PM, Maciek Próchniak <m...@touk.pl> wrote:
> 
> Hi Kostas,
> 
> I didn't see any discussion on dev mailing list, so I'd like to share our 
> problems/solutions (we had a busy month...;)
> 
> 1. we refactored ContinuousFileMonitoringFunction so that state includes not 
> only lastModificationTime, but also list of files that have exactly this 
> modification time. This way we're sure that we don't loose any files that 
> appear later with same modification time. It turned out that for local file 
> system this is quite important, as modificationTime in java can have one 
> second resolution (see e.g. 
> http://stackoverflow.com/questions/24804618/get-file-mtime-with-millisecond-resolution-from-java
>  - we learned it the hard way...)
> 
> 2. we are able to safely delete files in following way:
>    - in ContinuousFileReaderOperator we emit additional marker event after 
> end of split
>    - the split contains information how many splits are in the file
>    - we added additional operator of parallelism 1 after 
> ContinuousFileReaderOperator which tracks additional events so that it knows 
> when all splits from file has been processed and deletes finished files after 
> appropriate checkpoints have been completed.
> 
> If you & other committers find these ideas ok, I can prepare jiras and pull 
> requests. While the first point is pretty straightforward IMHO, I'd like to 
> get some feedback one the second one.
> 
> thanks,
> maciek
> 
> On 18/10/2016 11:52, Kostas Kloudas wrote:
>> Hi Maciek,
>> 
>> I agree with you that 1ms is often too long :P
>> 
>> This is the reason why I will open a discussion to have
>> all the ideas/ requirements / shortcomings in a single place.
>> This way the community can track and influence what
>> is coming next.
>> 
>> Hopefully I will do it in the afternoon and I will send you
>> the discussion thread.
>> 
>> Cheers,
>> Kostas
>> 
>>> On Oct 18, 2016, at 11:43 AM, Maciek Próchniak <m...@touk.pl> wrote:
>>> 
>>> Hi Kostas,
>>> 
>>> thanks for quick answer.
>>> 
>>> I wouldn't dare to delete files in InputFormat if they were splitted and 
>>> processed in parallel...
>>> 
>>> As for using notifyCheckpointComplete - thanks for suggestion, it looks 
>>> pretty interesting, I'll try to try it out. Although I wonder a bit if 
>>> relying only on modification timestamp is enough - many things may happen 
>>> in one ms :)
>>> 
>>> thanks,
>>> 
>>> macie
>>> 
>>> 
>>> On 18/10/2016 11:14, Kostas Kloudas wrote:
>>>> Hi Maciek,
>>>> 
>>>> Just a follow-up on the previous email, given that splits are read in 
>>>> parallel, when the
>>>> ContinuousFileMonitoringFunction forwards the last split, it does not mean 
>>>> that the
>>>> final splits is going to be processed last. If the node it gets assigned 
>>>> is fast enough
>>>> then it may be processed faster than others.
>>>> 
>>>> This assumption only holds if you have a parallelism of 1.
>>>> 
>>>> Cheers,
>>>> Kostas
>>>> 
>>>>> On Oct 18, 2016, at 11:05 AM, Kostas Kloudas 
>>>>> <k.klou...@data-artisans.com> wrote:
>>>>> 
>>>>> Hi Maciek,
>>>>> 
>>>>> Currently this functionality is not supported but this seems like a good 
>>>>> addition.
>>>>> Actually, give that the feature is rather new, we were thinking of 
>>>>> opening a discussion
>>>>> in the dev mailing list in order to
>>>>> 
>>>>> i) discuss some current limitations of the Continuous File Processing 
>>>>> source
>>>>> ii) see how people use it and adjust our features accordingly
>>>>> 
>>>>> I will let you know as soon as I open this thread.
>>>>> 
>>>>> By the way for your use-case, we should probably have a callback in the 
>>>>> notifyCheckpointComplete()
>>>>> that will inform the source that a given checkpoint was successfully 
>>>>> performed and then
>>>>> we can purge the already processed files. This can be a good solution.
>>>>> 
>>>>> Thanks,
>>>>> Kostas
>>>>> 
>>>>>> On Oct 18, 2016, at 9:40 AM, Maciek Próchniak <m...@touk.pl> wrote:
>>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> we want to monitor hdfs (or local) directory, read csv files that appear 
>>>>>> and after successful processing - delete them (mainly not to run out of 
>>>>>> disk space...)
>>>>>> 
>>>>>> I'm not quite sure how to achieve it with current implementation. 
>>>>>> Previously, when we read binary data (unsplittable files) we made small 
>>>>>> hack and deleted them
>>>>>> 
>>>>>> in our FileInputFormat - but now we want to use splits and detecting 
>>>>>> which split is 'the last one' is no longer so obvious - of course it's 
>>>>>> also problematic when it comes to checkpointing...
>>>>>> 
>>>>>> So my question is - is there a idiomatic way of deleting processed files?
>>>>>> 
>>>>>> 
>>>>>> thanks,
>>>>>> 
>>>>>> maciek
>>>>>> 
>> 
> 

Reply via email to