Hello Beam Users!

I was looking into a simple example in Python to have an unbound (--streaming 
flag ) pipeline that generated random numbers , applied a Fixed Window (let's 
say 5 seconds) and then applies a group by operation ( reshuffle) and print the 
result just to check.

I notice that this seems to work as long as there is no grouping operation 
(reshuffle, groupBy ,etc. ) that would leverage the windowing semantics.

    #Get Parameters from Command Line for the Pipeline
    known_args, pipeline_options = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(flags=argv)

    #Create pipeline
    p = beam.Pipeline(options=pipeline_options)


    #Execute Pipeline
    (p | "Start pipeline " >> beam.Create([0])
    | "Get values"  >> beam.ParDo(RandomNumberGenerator())
    | 'Applied fixed windows ' >> beam.WindowInto( window.FixedWindows(1*5) )
    | 'Reshuffle ' >> beam.Reshuffle()
    |  "Print" >> beam.Map(lambda x: print ("{} - {} ".format(os.getpid(), x) 
,flush=True ) )
    )

    result = p.run()
    result.wait_until_finish()


Even thought the  Random Generator is unbound and tagged as so with the 
decorator, it seems to stuck, if I make that step finite (i.e. adding a counter 
and exiting) then the code works in regular batch mode.

# =============================================================================
# Class for Splittable Do  Random Generatered numbers
# =============================================================================

@beam.transforms.core.DoFn.unbounded_per_element()
class RandomNumberGenerator(beam.DoFn):

    @beam.transforms.core.DoFn.unbounded_per_element()
    def process(self, element ):
        import random
        import time

        counter=0


        while True:

            #if counter>5:
            #    break
            nmb = random.randint(0, 1000)
            wait = random.randint(0, 5)
            rnow = time.time()


            print("Randy random", nmb)

            yield beam.window.TimestampedValue(nmb, rnow)
            time.sleep(wait)
            counter+=1

I have tried to implement as per documentation the tracker and watermark, but 
it seems that none of that seems to work either for the DirectRunner or 
FlinkRunner  (even there where reshuffle is not a custom operation but a vertex 
between the different ParDos). It seems to just stuck.

I event tried using the native 
PeriodicImpusle<https://beam.apache.org/releases/pydoc/2.30.0/apache_beam.transforms.periodicsequence.html?highlight=impulse#apache_beam.transforms.periodicsequence.PeriodicImpulse>
 as to factor out any of my implementation on it, however I still got the same 
result of it being 'stuck' on the GroupBy/Reshuffle operation.

In the past I have created with the Java SDK a Unbound Source (now obsoleted it 
seems according to doc)   streaming pipelines, however I noticed that  most of 
the unbound python readers like 
Kakfa<https://beam.apache.org/releases/pydoc/2.30.0/_modules/apache_beam/io/kafka.html#ReadFromKafka>
  and 
PubSub<https://beam.apache.org/releases/pydoc/2.30.0/_modules/apache_beam/io/external/gcp/pubsub.html#ReadFromPubSub>
 use ExternalTransforms behind the scenes so I am starting to wonder if such 
unbound sources are supported at all natively in Python.

I have done some Internet search and even tried  LLMs to get to have a 
suggestion but I don't seem to be successful in getting a clear answer on how 
to achieve this in Python or if this is even possible and after spending a 
couple days I figure I could ask the beam team and hear your thoughts about it 
and if you can reference me to any sample that might work so I can analyze it 
forward to understand what is missing would be greatly appreciated.



Regards,
JP - A fellow Apache Beam enthusiast

________________________________

The information in this Internet Email is confidential and may be legally 
privileged. It is intended solely for the addressee. Access to this Email by 
anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be taken in 
reliance on it, is prohibited and may be unlawful. When addressed to our 
clients any opinions or advice contained in this Email are subject to the terms 
and conditions expressed in any applicable governing The Home Depot terms of 
business or client engagement letter. The Home Depot disclaims all 
responsibility and liability for the accuracy and content of this attachment 
and for any damages or losses arising from any inaccuracies, errors, viruses, 
e.g., worms, trojan horses, etc., or other items of a destructive nature, which 
may be contained in this attachment and shall not be liable for direct, 
indirect, consequential or special damages in connection with this e-mail 
message or its attachment.


INTERNAL USE

Reply via email to