Hello,

I am building a pipeline using two SDFs that are chained. The first
function (DirectoryWatchFn) checks a folder continuously and grabs if a new
file is added. The second one (ProcessFilesFn) processes a file
while splitting each line - the processing simply prints the file name and
line number.

The process function of the first SDF gets stuck if I yield a new file
object. Specifically, although the second SDF is called as I can check the
initial restriction is created, the tracker is not created at all!

On the other hand, if I return the file object list, the second SDF works
fine but the issue is the first SDF stops as soon as it returns the first
list of files.

The source of the pipeline can be found in
- First SDF:
https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/directory_watch.py
- Second SDF:
https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/file_read.py
- Pipeline:
https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/streaming_file_read.py

Can you please inform me how to handle this issue?

Cheers,
Jaehyeon

class DirectoryWatchFn(beam.DoFn):
    POLL_TIMEOUT = 10

    @beam.DoFn.unbounded_per_element()
    def process(
        self,
        element: str,
        tracker: RestrictionTrackerView = beam.DoFn.RestrictionParam(
            DirectoryWatchRestrictionProvider()
        ),
        watermark_estimater: WatermarkEstimatorProvider = beam.DoFn.
WatermarkEstimatorParam(
            DirectoryWatchWatermarkEstimatorProvider()
        ),
    ) -> typing.Iterable[MyFile]:
        new_files = self._get_new_files_if_any(element, tracker)
        if self._process_new_files(tracker, watermark_estimater, new_files):
            # return [new_file[0] for new_file in new_files] #<-- it
doesn't get stuck but the SDF finishes
            for new_file in new_files: #<--- it gets stuck if yielding file
objects
                yield new_file[0]
        else:
            return
        tracker.defer_remainder(Duration.of(self.POLL_TIMEOUT))

    def _get_new_files_if_any(
        self, element: str, tracker: DirectoryWatchRestrictionTracker
    ) -> typing.List[typing.Tuple[MyFile, Timestamp]]:
        new_files = []
        for file in os.listdir(element):
            if (
                os.path.isfile(os.path.join(element, file))
                and file not in tracker.current_restriction().
already_processed
            ):
                num_lines = sum(1 for _ in open(os.path.join(element, file
)))
                new_file = MyFile(file, 0, num_lines)
                print(new_file)
                new_files.append(
                    (
                        new_file,
                        Timestamp.of(os.path.getmtime(os.path.join(element,
file))),
                    )
                )
        return new_files

    def _process_new_files(
        self,
        tracker: DirectoryWatchRestrictionTracker,
        watermark_estimater: ManualWatermarkEstimator,
        new_files: typing.List[typing.Tuple[MyFile, Timestamp]],
    ):
        max_instance = watermark_estimater.current_watermark()
        for new_file in new_files:
            if tracker.try_claim(new_file[0].name) is False:
                watermark_estimater.set_watermark(max_instance)
                return False
            if max_instance < new_file[1]:
                max_instance = new_file[1]
        watermark_estimater.set_watermark(max_instance)
        return max_instance < MAX_TIMESTAMP

Reply via email to