Is this what you are looking for?
import random
import time
import apache_beam as beam
from apache_beam.transforms import trigger, window
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.utils.timestamp import Timestamp
with beam.Pipeline() as p:
input = (
p
| PeriodicImpulse(
start_timestamp=time.time(),
stop_timestamp=time.time() + 16,
fire_interval=1,
apply_windowing=False,
)
| beam.Map(lambda x: random.random())
| beam.WindowInto(window.FixedWindows(4))
| beam.GroupBy()
| "Print Windows"
>> beam.transforms.util.LogElements(with_timestamp=True,
with_window=True)
)
On Fri, Mar 8, 2024 at 6:48 AM Puertos tavares, Jose J (Canada) via user <
[email protected]> wrote:
> *Hello Beam Users!*
>
>
>
> I was looking into a simple example in Python to have an unbound
> (--streaming flag ) pipeline that generated random numbers , applied a
> Fixed Window (let’s say 5 seconds) and then applies a group by operation (
> reshuffle) and print the result just to check.
>
>
>
> I notice that this seems to work as long as there is no grouping operation
> (reshuffle, groupBy ,etc. ) that would leverage the windowing semantics.
>
>
>
> * #Get Parameters from Command Line for the Pipeline*
>
> known_args, pipeline_options = parser.parse_known_args(argv)
>
> pipeline_options = PipelineOptions(flags=argv)
>
>
>
> * #Create pipeline*
>
> p = beam.Pipeline(options=pipeline_options)
>
>
>
>
>
> * #Execute Pipeline*
>
> (p | "Start pipeline " >> beam.Create([0])
>
> | "Get values" >> beam.ParDo(RandomNumberGenerator())
>
> | 'Applied fixed windows ' >> beam.WindowInto(
> window.FixedWindows(1*5) )
>
> | 'Reshuffle ' >> beam.Reshuffle()
>
> | "Print" >> beam.Map(lambda x: print ("{} - {} ".format(os.getpid(),
> x) ,flush=True ) )
>
> )
>
>
>
> result = p.run()
>
> result.wait_until_finish()
>
>
>
>
>
> Even thought the Random Generator is unbound and tagged as so with the
> decorator, it seems to stuck, if I make that step finite (i.e. adding a
> counter and exiting) then the code works in regular batch mode.
>
>
>
> #
> =============================================================================
>
> # Class for Splittable Do Random Generatered numbers
>
> #
> =============================================================================
>
>
>
> @beam.transforms.core.DoFn.unbounded_per_element()
>
> class RandomNumberGenerator(beam.DoFn):
>
>
>
> @beam.transforms.core.DoFn.unbounded_per_element()
>
> def process(self, element ):
>
> import random
>
> import time
>
>
>
> counter=0
>
>
>
>
>
> while True:
>
>
>
> #if counter>5:
>
> # break
>
> nmb = random.randint(0, 1000)
>
> wait = random.randint(0, 5)
>
> rnow = time.time()
>
>
>
>
>
> print("Randy random", nmb)
>
>
>
> yield beam.window.TimestampedValue(nmb, rnow)
>
> time.sleep(wait)
>
> counter+=1
>
>
>
> I have tried to implement as per documentation the tracker and watermark,
> but it seems that none of that seems to work either for the *DirectRunner
> or FlinkRunner* (even there where reshuffle is not a custom operation
> but a vertex between the different ParDos). It seems to just stuck.
>
>
>
> I event tried using the native PeriodicImpusle
> <https://beam.apache.org/releases/pydoc/2.30.0/apache_beam.transforms.periodicsequence.html?highlight=impulse#apache_beam.transforms.periodicsequence.PeriodicImpulse>
> as to factor out any of my implementation on it, however I still got the
> same result of it being ‘stuck’ on the GroupBy/Reshuffle operation.
>
>
>
> In the past I have created with the Java SDK a Unbound Source (now
> obsoleted it seems according to doc) streaming pipelines, however I
> noticed that most of the unbound python readers like Kakfa
> <https://beam.apache.org/releases/pydoc/2.30.0/_modules/apache_beam/io/kafka.html#ReadFromKafka>
> and PubSub
> <https://beam.apache.org/releases/pydoc/2.30.0/_modules/apache_beam/io/external/gcp/pubsub.html#ReadFromPubSub>
> use ExternalTransforms behind the scenes so I am starting to wonder if such
> unbound sources are supported at all natively in Python.
>
>
>
> I have done some Internet search and even tried LLMs to get to have a
> suggestion but I don’t seem to be successful in getting a clear answer on
> how to achieve this in Python or if this is even possible and after
> spending a couple days I figure I could ask the beam team and hear your
> thoughts about it and if you can reference me to any sample that might work
> so I can analyze it forward to understand what is missing would be greatly
> appreciated.
>
>
>
>
>
>
>
> Regards,
>
> *JP – A fellow Apache Beam enthusiast *
>
> ------------------------------
>
> The information in this Internet Email is confidential and may be legally
> privileged. It is intended solely for the addressee. Access to this Email
> by anyone else is unauthorized. If you are not the intended recipient, any
> disclosure, copying, distribution or any action taken or omitted to be
> taken in reliance on it, is prohibited and may be unlawful. When addressed
> to our clients any opinions or advice contained in this Email are subject
> to the terms and conditions expressed in any applicable governing The Home
> Depot terms of business or client engagement letter. The Home Depot
> disclaims all responsibility and liability for the accuracy and content of
> this attachment and for any damages or losses arising from any
> inaccuracies, errors, viruses, e.g., worms, trojan horses, etc., or other
> items of a destructive nature, which may be contained in this attachment
> and shall not be liable for direct, indirect, consequential or special
> damages in connection with this e-mail message or its attachment.
>
> INTERNAL USE
>