> This is not a robust solution, I would advise against it. Oh no? Am curious as to why not. It seems not dissimilar to how Kafka topic retention works: the messages are removed after some time period (hopefully after they are processed), so why would it be bad to remove files that are already processed?
Or was it the querying of the checkpoints you were advising against? To be sure, I was referring to moving the previously processed files away, not the checkpoints themselves. On Fri, Oct 27, 2023 at 12:45 PM Alexander Fedulov < alexander.fedu...@gmail.com> wrote: > > I wonder if you could use this fact to query the committed checkpoints > and move them away after the job is done. > > This is not a robust solution, I would advise against it. > > Best, > Alexander > > On Fri, 27 Oct 2023 at 16:41, Andrew Otto <o...@wikimedia.org> wrote: > >> For moving the files: >> > It will keep the files as is and remember the name of the file read in >> checkpointed state to ensure it doesnt read the same file twice. >> >> I wonder if you could use this fact to query the committed checkpoints >> and move them away after the job is done. I think it should even be safe >> to do this outside of the Flink job periodically (cron, whatever), because >> on restart it won't reprocess the files that have been committed in the >> checkpoints. >> >> >> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/libs/state_processor_api/#reading-state >> >> >> >> >> On Fri, Oct 27, 2023 at 1:13 AM arjun s <arjunjoice...@gmail.com> wrote: >> >>> Hi team, Thanks for your quick response. >>> I have an inquiry regarding file processing in the event of a job >>> restart. When the job is restarted, we encounter challenges in tracking >>> which files have been processed and which remain pending. Is there a method >>> to seamlessly resume processing files from where they were left off, >>> particularly in situations where we need to submit and restart the job >>> manually due to any server restart or application restart? This becomes an >>> issue when the job processes all the files in the directory from the >>> beginning after a restart, and I'm seeking a solution to address this. >>> >>> Thanks and regards, >>> Arjun >>> >>> On Fri, 27 Oct 2023 at 07:29, Chirag Dewan <chirag.dewa...@yahoo.in> >>> wrote: >>> >>>> Hi Arjun, >>>> >>>> Flink's FileSource doesnt move or delete the files as of now. It will >>>> keep the files as is and remember the name of the file read in checkpointed >>>> state to ensure it doesnt read the same file twice. >>>> >>>> Flink's source API works in a way that single Enumerator operates on >>>> the JobManager. The enumerator is responsible for listing the files and >>>> splitting these into smaller units. These units could be the complete file >>>> (in case of row formats) or splits within a file (for bulk formats). The >>>> reading is done by SplitReaders in the Task Managers. This way it ensures >>>> that only reading is done concurrently and is able to track file >>>> completions. >>>> >>>> You can read more Flink Sources >>>> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/sources/> >>>> and here >>>> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/filesystem/> >>>> >>>> FileSystem >>>> >>>> FileSystem # This connector provides a unified Source and Sink for >>>> BATCH and STREAMING that reads or writes (par... >>>> >>>> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/filesystem/> >>>> >>>> >>>> >>>> On Thursday, 26 October, 2023 at 06:53:23 pm IST, arjun s < >>>> arjunjoice...@gmail.com> wrote: >>>> >>>> >>>> Hello team, >>>> I'm currently in the process of configuring a Flink job. This job >>>> entails reading files from a specified directory and then transmitting the >>>> data to a Kafka sink. I've already successfully designed a Flink job that >>>> reads the file contents in a streaming manner and effectively sends them to >>>> Kafka. However, my specific requirement is a bit more intricate. I need the >>>> job to not only read these files and push the data to Kafka but also >>>> relocate the processed file to a different directory once all of its >>>> contents have been processed. Following this, the job should seamlessly >>>> transition to processing the next file in the source directory. >>>> Additionally, I have some concerns regarding how the job will behave if it >>>> encounters a restart. Could you please advise if this is achievable, and if >>>> so, provide guidance or code to implement it? >>>> >>>> I'm also quite interested in how the job will handle situations where >>>> the source has a parallelism greater than 2 or 3, and how it can accurately >>>> monitor the completion of reading all contents in each file. >>>> >>>> Thanks and Regards, >>>> Arjun >>>> >>>