Have you tried to use other runners? I think this might be caused by some gaps in Python DirectRunner to support the streaming cases or SDFs,
On Sun, May 5, 2024 at 5:10 AM Jaehyeon Kim <dott...@gmail.com> wrote: > Hi XQ > > Thanks for checking it out. SDFs chaining seems to work as I created my > pipeline while converting a pipeline that is built in the Java SDK. The > source of the Java pipeline can be found in > https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter7/src/main/java/com/packtpub/beam/chapter7/StreamingFileRead.java > > So far, when I yield outputs, the second SDF gets stuck while it gets > executed if I return them (but the first SDF completes). If I change the > second SDF into a do function without adding the tracker, it is executed > fine. Not sure what happens in the first scenario. > > Cheers, > Jaehyeon > > On Sun, 5 May 2024 at 09:21, XQ Hu via user <user@beam.apache.org> wrote: > >> I played with your example. Indeed, create_tracker in your ProcessFilesFn >> is never called, which is quite strange. >> I could not find any example that shows the chained SDFs, which makes me >> wonder whether the chained SDFs work. >> >> @Chamikara Jayalath <chamik...@google.com> Any thoughts? >> >> On Fri, May 3, 2024 at 2:45 AM Jaehyeon Kim <dott...@gmail.com> wrote: >> >>> Hello, >>> >>> I am building a pipeline using two SDFs that are chained. The first >>> function (DirectoryWatchFn) checks a folder continuously and grabs if a new >>> file is added. The second one (ProcessFilesFn) processes a file >>> while splitting each line - the processing simply prints the file name and >>> line number. >>> >>> The process function of the first SDF gets stuck if I yield a new file >>> object. Specifically, although the second SDF is called as I can check the >>> initial restriction is created, the tracker is not created at all! >>> >>> On the other hand, if I return the file object list, the second SDF >>> works fine but the issue is the first SDF stops as soon as it returns the >>> first list of files. >>> >>> The source of the pipeline can be found in >>> - First SDF: >>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/directory_watch.py >>> - Second SDF: >>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/file_read.py >>> - Pipeline: >>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/streaming_file_read.py >>> >>> Can you please inform me how to handle this issue? >>> >>> Cheers, >>> Jaehyeon >>> >>> class DirectoryWatchFn(beam.DoFn): >>> POLL_TIMEOUT = 10 >>> >>> @beam.DoFn.unbounded_per_element() >>> def process( >>> self, >>> element: str, >>> tracker: RestrictionTrackerView = beam.DoFn.RestrictionParam( >>> DirectoryWatchRestrictionProvider() >>> ), >>> watermark_estimater: WatermarkEstimatorProvider = beam.DoFn. >>> WatermarkEstimatorParam( >>> DirectoryWatchWatermarkEstimatorProvider() >>> ), >>> ) -> typing.Iterable[MyFile]: >>> new_files = self._get_new_files_if_any(element, tracker) >>> if self._process_new_files(tracker, watermark_estimater, >>> new_files): >>> # return [new_file[0] for new_file in new_files] #<-- it >>> doesn't get stuck but the SDF finishes >>> for new_file in new_files: #<--- it gets stuck if yielding >>> file objects >>> yield new_file[0] >>> else: >>> return >>> tracker.defer_remainder(Duration.of(self.POLL_TIMEOUT)) >>> >>> def _get_new_files_if_any( >>> self, element: str, tracker: DirectoryWatchRestrictionTracker >>> ) -> typing.List[typing.Tuple[MyFile, Timestamp]]: >>> new_files = [] >>> for file in os.listdir(element): >>> if ( >>> os.path.isfile(os.path.join(element, file)) >>> and file not in tracker.current_restriction(). >>> already_processed >>> ): >>> num_lines = sum(1 for _ in open(os.path.join(element, >>> file))) >>> new_file = MyFile(file, 0, num_lines) >>> print(new_file) >>> new_files.append( >>> ( >>> new_file, >>> Timestamp.of(os.path.getmtime(os.path.join( >>> element, file))), >>> ) >>> ) >>> return new_files >>> >>> def _process_new_files( >>> self, >>> tracker: DirectoryWatchRestrictionTracker, >>> watermark_estimater: ManualWatermarkEstimator, >>> new_files: typing.List[typing.Tuple[MyFile, Timestamp]], >>> ): >>> max_instance = watermark_estimater.current_watermark() >>> for new_file in new_files: >>> if tracker.try_claim(new_file[0].name) is False: >>> watermark_estimater.set_watermark(max_instance) >>> return False >>> if max_instance < new_file[1]: >>> max_instance = new_file[1] >>> watermark_estimater.set_watermark(max_instance) >>> return max_instance < MAX_TIMESTAMP >>> >>