Have you tried to use other runners? I think this might be caused by some
gaps in Python DirectRunner to support the streaming cases or SDFs,

On Sun, May 5, 2024 at 5:10 AM Jaehyeon Kim <dott...@gmail.com> wrote:

> Hi XQ
>
> Thanks for checking it out. SDFs chaining seems to work as I created my
> pipeline while converting a pipeline that is built in the Java SDK. The
> source of the Java pipeline can be found in
> https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter7/src/main/java/com/packtpub/beam/chapter7/StreamingFileRead.java
>
> So far, when I yield outputs, the second SDF gets stuck while it gets
> executed if I return them (but the first SDF completes). If I change the
> second SDF into a do function without adding the tracker, it is executed
> fine. Not sure what happens in the first scenario.
>
> Cheers,
> Jaehyeon
>
> On Sun, 5 May 2024 at 09:21, XQ Hu via user <user@beam.apache.org> wrote:
>
>> I played with your example. Indeed, create_tracker in your ProcessFilesFn
>> is never called, which is quite strange.
>> I could not find any example that shows the chained SDFs, which makes me
>> wonder whether the chained SDFs work.
>>
>> @Chamikara Jayalath <chamik...@google.com> Any thoughts?
>>
>> On Fri, May 3, 2024 at 2:45 AM Jaehyeon Kim <dott...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I am building a pipeline using two SDFs that are chained. The first
>>> function (DirectoryWatchFn) checks a folder continuously and grabs if a new
>>> file is added. The second one (ProcessFilesFn) processes a file
>>> while splitting each line - the processing simply prints the file name and
>>> line number.
>>>
>>> The process function of the first SDF gets stuck if I yield a new file
>>> object. Specifically, although the second SDF is called as I can check the
>>> initial restriction is created, the tracker is not created at all!
>>>
>>> On the other hand, if I return the file object list, the second SDF
>>> works fine but the issue is the first SDF stops as soon as it returns the
>>> first list of files.
>>>
>>> The source of the pipeline can be found in
>>> - First SDF:
>>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/directory_watch.py
>>> - Second SDF:
>>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/file_read.py
>>> - Pipeline:
>>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/streaming_file_read.py
>>>
>>> Can you please inform me how to handle this issue?
>>>
>>> Cheers,
>>> Jaehyeon
>>>
>>> class DirectoryWatchFn(beam.DoFn):
>>>     POLL_TIMEOUT = 10
>>>
>>>     @beam.DoFn.unbounded_per_element()
>>>     def process(
>>>         self,
>>>         element: str,
>>>         tracker: RestrictionTrackerView = beam.DoFn.RestrictionParam(
>>>             DirectoryWatchRestrictionProvider()
>>>         ),
>>>         watermark_estimater: WatermarkEstimatorProvider = beam.DoFn.
>>> WatermarkEstimatorParam(
>>>             DirectoryWatchWatermarkEstimatorProvider()
>>>         ),
>>>     ) -> typing.Iterable[MyFile]:
>>>         new_files = self._get_new_files_if_any(element, tracker)
>>>         if self._process_new_files(tracker, watermark_estimater,
>>> new_files):
>>>             # return [new_file[0] for new_file in new_files] #<-- it
>>> doesn't get stuck but the SDF finishes
>>>             for new_file in new_files: #<--- it gets stuck if yielding
>>> file objects
>>>                 yield new_file[0]
>>>         else:
>>>             return
>>>         tracker.defer_remainder(Duration.of(self.POLL_TIMEOUT))
>>>
>>>     def _get_new_files_if_any(
>>>         self, element: str, tracker: DirectoryWatchRestrictionTracker
>>>     ) -> typing.List[typing.Tuple[MyFile, Timestamp]]:
>>>         new_files = []
>>>         for file in os.listdir(element):
>>>             if (
>>>                 os.path.isfile(os.path.join(element, file))
>>>                 and file not in tracker.current_restriction().
>>> already_processed
>>>             ):
>>>                 num_lines = sum(1 for _ in open(os.path.join(element,
>>> file)))
>>>                 new_file = MyFile(file, 0, num_lines)
>>>                 print(new_file)
>>>                 new_files.append(
>>>                     (
>>>                         new_file,
>>>                         Timestamp.of(os.path.getmtime(os.path.join(
>>> element, file))),
>>>                     )
>>>                 )
>>>         return new_files
>>>
>>>     def _process_new_files(
>>>         self,
>>>         tracker: DirectoryWatchRestrictionTracker,
>>>         watermark_estimater: ManualWatermarkEstimator,
>>>         new_files: typing.List[typing.Tuple[MyFile, Timestamp]],
>>>     ):
>>>         max_instance = watermark_estimater.current_watermark()
>>>         for new_file in new_files:
>>>             if tracker.try_claim(new_file[0].name) is False:
>>>                 watermark_estimater.set_watermark(max_instance)
>>>                 return False
>>>             if max_instance < new_file[1]:
>>>                 max_instance = new_file[1]
>>>         watermark_estimater.set_watermark(max_instance)
>>>         return max_instance < MAX_TIMESTAMP
>>>
>>

Reply via email to