I played with your example. Indeed, create_tracker in your ProcessFilesFn
is never called, which is quite strange.
I could not find any example that shows the chained SDFs, which makes me
wonder whether the chained SDFs work.

@Chamikara Jayalath <chamik...@google.com> Any thoughts?

On Fri, May 3, 2024 at 2:45 AM Jaehyeon Kim <dott...@gmail.com> wrote:

> Hello,
>
> I am building a pipeline using two SDFs that are chained. The first
> function (DirectoryWatchFn) checks a folder continuously and grabs if a new
> file is added. The second one (ProcessFilesFn) processes a file
> while splitting each line - the processing simply prints the file name and
> line number.
>
> The process function of the first SDF gets stuck if I yield a new file
> object. Specifically, although the second SDF is called as I can check the
> initial restriction is created, the tracker is not created at all!
>
> On the other hand, if I return the file object list, the second SDF works
> fine but the issue is the first SDF stops as soon as it returns the first
> list of files.
>
> The source of the pipeline can be found in
> - First SDF:
> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/directory_watch.py
> - Second SDF:
> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/file_read.py
> - Pipeline:
> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/streaming_file_read.py
>
> Can you please inform me how to handle this issue?
>
> Cheers,
> Jaehyeon
>
> class DirectoryWatchFn(beam.DoFn):
>     POLL_TIMEOUT = 10
>
>     @beam.DoFn.unbounded_per_element()
>     def process(
>         self,
>         element: str,
>         tracker: RestrictionTrackerView = beam.DoFn.RestrictionParam(
>             DirectoryWatchRestrictionProvider()
>         ),
>         watermark_estimater: WatermarkEstimatorProvider = beam.DoFn.
> WatermarkEstimatorParam(
>             DirectoryWatchWatermarkEstimatorProvider()
>         ),
>     ) -> typing.Iterable[MyFile]:
>         new_files = self._get_new_files_if_any(element, tracker)
>         if self._process_new_files(tracker, watermark_estimater, new_files
> ):
>             # return [new_file[0] for new_file in new_files] #<-- it
> doesn't get stuck but the SDF finishes
>             for new_file in new_files: #<--- it gets stuck if yielding
> file objects
>                 yield new_file[0]
>         else:
>             return
>         tracker.defer_remainder(Duration.of(self.POLL_TIMEOUT))
>
>     def _get_new_files_if_any(
>         self, element: str, tracker: DirectoryWatchRestrictionTracker
>     ) -> typing.List[typing.Tuple[MyFile, Timestamp]]:
>         new_files = []
>         for file in os.listdir(element):
>             if (
>                 os.path.isfile(os.path.join(element, file))
>                 and file not in tracker.current_restriction().
> already_processed
>             ):
>                 num_lines = sum(1 for _ in open(os.path.join(element, file
> )))
>                 new_file = MyFile(file, 0, num_lines)
>                 print(new_file)
>                 new_files.append(
>                     (
>                         new_file,
>                         Timestamp.of(os.path.getmtime(os.path.join(element,
> file))),
>                     )
>                 )
>         return new_files
>
>     def _process_new_files(
>         self,
>         tracker: DirectoryWatchRestrictionTracker,
>         watermark_estimater: ManualWatermarkEstimator,
>         new_files: typing.List[typing.Tuple[MyFile, Timestamp]],
>     ):
>         max_instance = watermark_estimater.current_watermark()
>         for new_file in new_files:
>             if tracker.try_claim(new_file[0].name) is False:
>                 watermark_estimater.set_watermark(max_instance)
>                 return False
>             if max_instance < new_file[1]:
>                 max_instance = new_file[1]
>         watermark_estimater.set_watermark(max_instance)
>         return max_instance < MAX_TIMESTAMP
>

Reply via email to