Hi Maciek, Thanks for bringing this up again and sorry for not opening the discussion yet.
I will check it out and get back to you during week. Kostas > On Nov 26, 2016, at 9:40 PM, Maciek Próchniak <m...@touk.pl> wrote: > > Hi Kostas, > > I didn't see any discussion on dev mailing list, so I'd like to share our > problems/solutions (we had a busy month...;) > > 1. we refactored ContinuousFileMonitoringFunction so that state includes not > only lastModificationTime, but also list of files that have exactly this > modification time. This way we're sure that we don't loose any files that > appear later with same modification time. It turned out that for local file > system this is quite important, as modificationTime in java can have one > second resolution (see e.g. > http://stackoverflow.com/questions/24804618/get-file-mtime-with-millisecond-resolution-from-java > - we learned it the hard way...) > > 2. we are able to safely delete files in following way: > - in ContinuousFileReaderOperator we emit additional marker event after > end of split > - the split contains information how many splits are in the file > - we added additional operator of parallelism 1 after > ContinuousFileReaderOperator which tracks additional events so that it knows > when all splits from file has been processed and deletes finished files after > appropriate checkpoints have been completed. > > If you & other committers find these ideas ok, I can prepare jiras and pull > requests. While the first point is pretty straightforward IMHO, I'd like to > get some feedback one the second one. > > thanks, > maciek > > On 18/10/2016 11:52, Kostas Kloudas wrote: >> Hi Maciek, >> >> I agree with you that 1ms is often too long :P >> >> This is the reason why I will open a discussion to have >> all the ideas/ requirements / shortcomings in a single place. >> This way the community can track and influence what >> is coming next. >> >> Hopefully I will do it in the afternoon and I will send you >> the discussion thread. >> >> Cheers, >> Kostas >> >>> On Oct 18, 2016, at 11:43 AM, Maciek Próchniak <m...@touk.pl> wrote: >>> >>> Hi Kostas, >>> >>> thanks for quick answer. >>> >>> I wouldn't dare to delete files in InputFormat if they were splitted and >>> processed in parallel... >>> >>> As for using notifyCheckpointComplete - thanks for suggestion, it looks >>> pretty interesting, I'll try to try it out. Although I wonder a bit if >>> relying only on modification timestamp is enough - many things may happen >>> in one ms :) >>> >>> thanks, >>> >>> macie >>> >>> >>> On 18/10/2016 11:14, Kostas Kloudas wrote: >>>> Hi Maciek, >>>> >>>> Just a follow-up on the previous email, given that splits are read in >>>> parallel, when the >>>> ContinuousFileMonitoringFunction forwards the last split, it does not mean >>>> that the >>>> final splits is going to be processed last. If the node it gets assigned >>>> is fast enough >>>> then it may be processed faster than others. >>>> >>>> This assumption only holds if you have a parallelism of 1. >>>> >>>> Cheers, >>>> Kostas >>>> >>>>> On Oct 18, 2016, at 11:05 AM, Kostas Kloudas >>>>> <k.klou...@data-artisans.com> wrote: >>>>> >>>>> Hi Maciek, >>>>> >>>>> Currently this functionality is not supported but this seems like a good >>>>> addition. >>>>> Actually, give that the feature is rather new, we were thinking of >>>>> opening a discussion >>>>> in the dev mailing list in order to >>>>> >>>>> i) discuss some current limitations of the Continuous File Processing >>>>> source >>>>> ii) see how people use it and adjust our features accordingly >>>>> >>>>> I will let you know as soon as I open this thread. >>>>> >>>>> By the way for your use-case, we should probably have a callback in the >>>>> notifyCheckpointComplete() >>>>> that will inform the source that a given checkpoint was successfully >>>>> performed and then >>>>> we can purge the already processed files. This can be a good solution. >>>>> >>>>> Thanks, >>>>> Kostas >>>>> >>>>>> On Oct 18, 2016, at 9:40 AM, Maciek Próchniak <m...@touk.pl> wrote: >>>>>> >>>>>> Hi, >>>>>> >>>>>> we want to monitor hdfs (or local) directory, read csv files that appear >>>>>> and after successful processing - delete them (mainly not to run out of >>>>>> disk space...) >>>>>> >>>>>> I'm not quite sure how to achieve it with current implementation. >>>>>> Previously, when we read binary data (unsplittable files) we made small >>>>>> hack and deleted them >>>>>> >>>>>> in our FileInputFormat - but now we want to use splits and detecting >>>>>> which split is 'the last one' is no longer so obvious - of course it's >>>>>> also problematic when it comes to checkpointing... >>>>>> >>>>>> So my question is - is there a idiomatic way of deleting processed files? >>>>>> >>>>>> >>>>>> thanks, >>>>>> >>>>>> maciek >>>>>> >> >