**
*Hello group:*
I believe it might be interesting to show what I have found so found
with you feedback as I have corroborated that the Direct Runners and
Flink Runner DO work on streaming, but it seems more of a constraint
on the definition of the PCollection rather than the operators, as
show in my code
https://play.beam.apache.org/?sdk=python&shared=TJNavCeJ_DS
<https://play.beam.apache.org/?sdk=python&shared=TJNavCeJ_DS>
Based on the fact that most samples leverage a messaging system as the
streaming source I decided to use the Google Cloud PubSub emulator
<https://cloud.google.com/pubsub/docs/emulator> to have a setup
where I push a message to the topic at the beginning, and create a
pipeline that consumes the message from a subscription, applies the
windowing, applies the group by operation and at the end it pushes
again the message hence providing the forever loop of consumption for
streaming
*gcloud beta emulators pubsub start --project=beam-sample***
Executing: cmd /c …. cloud-pubsub-emulator.bat --host=localhost
--port=8085
[pubsub] This is the Google Pub/Sub fake.
[pubsub] Implementation may be incomplete or differ from the real system.
[pubsub] Mar 08, 2024 7:46:19 PM
com.google.cloud.pubsub.testing.v1.Main main
[pubsub] INFO: IAM integration is disabled. IAM policy methods and ACL
checks are not supported
[pubsub] SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
[pubsub] SLF4J: Defaulting to no-operation (NOP) logger implementation
[pubsub] SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder
for further details.
[pubsub] Mar 08, 2024 7:46:21 PM
com.google.cloud.pubsub.testing.v1.Main main
[pubsub] INFO: Server started, listening on 8085
In another terminal then run the setup and the pipeline and run the
using emulator documentation
<https://cloud.google.com/pubsub/docs/emulator#using_the_emulator> to
create the fake topic and subscription using the publisher.py
<https://raw.githubusercontent.com/googleapis/python-pubsub/main/samples/snippets/publisher.py>
and subscriber.py
<https://raw.githubusercontent.com/googleapis/python-pubsub/main/samples/snippets/subscriber.py>
scripts
*set PUBSUB_EMULATOR_HOST=localhost:8085***
*set PUBSUB_PROJECT_ID=**beam-sample***
*publisher.py beam-sample create topic***
Created topic: projects/beam-sample/topics/topic
*subscriber.py beam-sample create topic subscription***
Subscription created: name:
"projects/beam-sample/subscriptions/subscription"
topic: "projects/beam-sample/topics/topic"
push_config {
}
ack_deadline_seconds: 10
message_retention_duration {
seconds: 604800
}
*publisher.py beam-sample publish topic***
1
2
3
4
5
6
7
8
9
Published messages to projects/beam-sample/topics/topic.
*Direct Runner*looped in streaming as expected (although in my system
it wasn’t every 10 seconds)
*eternal_pubsub.py --streaming true***
Starting pipeline...
Message number 1
Message number 2
Message number 3
Message number 4
Message number 5
Message number 6
Message number 7
Message number 8
Message number 9
Messages from PubSub :9
Messages from PubSub :1
Messages from PubSub :1
Messages from PubSub :1
…
As per this post
<https://stackoverflow.com/questions/68342095/error-while-running-beam-streaming-pipeline-python-with-pub-sub-io-in-embedded>
FlinkRunner doesn’t support the PubSub operator but then I guess Kafka
or other existing Unbound PCollection generator would work, and as I
mentioned on my first post the ones that I have created are with the
“old I/O Java Source”.
To summarize it seems then more than the support of Group By
Operations is more towards the Unbounded collections.. I’ll keep
investigating and for the time being the approach we’ll take is micro
batching , just wanted to close the loop and thank the team for your
kind responses
Regards,
JP
*
*
*
INTERNAL USE
From:*Puertos tavares, Jose J (Canada) via user <user@beam.apache.org>
*Sent:* Friday, March 8, 2024 7:28 PM
*To:* Robert Bradshaw <rober...@google.com>; user@beam.apache.org
*Cc:* Puertos tavares, Jose J (Canada)
<jose_j_puertos_tava...@homedepot.com>
*Subject:* [EXTERNAL] Re: [Question] Python Streaming Pipeline Support
Hello Robert: Thanks for your answer, however as I posted at the
begging tested it with Flink as well. There's more interesting as
Reshuffle is a native (balance) operation but still doesn't seem to
progress with streaming. Where you able to
Hello Robert:
Thanks for your answer, however as I posted at the begging tested
it with Flink as well.
There's more interesting as Reshuffle is a native (balance)
operation but still doesn't seem to progress with streaming.
Where you able to run int successfully with the expected behavior?
Here running latest 2.54.0 on Windows.
Regards
JP
------------------------------------------------------------------------
*From:*Robert Bradshaw <rober...@google.com>
*Sent:* Friday, March 8, 2024 6:49 PM
*To:* user@beam.apache.org <user@beam.apache.org>
*Cc:* XQ Hu <x...@google.com>; Puertos tavares, Jose J (Canada)
<jose_j_puertos_tava...@homedepot.com>
*Subject:* [EXTERNAL] Re: [Question] Python Streaming Pipeline Support
The Python Local Runner has limited support for streaming pipelines.
For the time being would recommend using Dataflow or Flink (the latter
can be run locally) to try out streaming pipelines. On Fri, Mar 8,
2024 at 2: 11 PM Puertos tavares,
The Python Local Runner has limited support for streaming pipelines.
For the time being would recommend using Dataflow or Flink (the latter
can be run locally) to try out streaming pipelines.
On Fri, Mar 8, 2024 at 2:11 PM Puertos tavares, Jose J (Canada) via
user <user@beam.apache.org> wrote:
>
> Hello Hu:
>
>
>
> Not really. This one as you have coded it finishes as per
stop_timestamp=time.time() + 16 and after it finish emitting then everything else
gets output and the pipeline in batch mode terminates.
>
>
>
> You can rule out STDOUT issues and confirm this behavior as putting a ParDo
with something that would throw an exception after the GroupBy or write temporary
files/make HTTP requests. This ParDO won’t be executed until your PeriodImpulse
terminates (you can extend it to +60 and see this is not being trigger on your 4
second window, but until it stops generating)
>
>
>
> I am looking for something that is really streaming and executes constantly
and that in this case , every 4 seconds the window would process the elements in
the window and wait for the next window to accumulate.
>
>
>
> Regards,
>
> JP
>
>
>
>
>
>
>
>
>
>
> INTERNAL USE
>
> From: XQ Hu <x...@google.com>
> Sent: Friday, March 8, 2024 3:51 PM
> To: user@beam.apache.org
> Cc: Puertos tavares, Jose J (Canada) <jose_j_puertos_tava...@homedepot.com>
> Subject: [EXTERNAL] Re: [Question] Python Streaming Pipeline Support
>
>
>
> Is this what you are looking for? import random import time import
apache_beam as beam from apache_beam. transforms import trigger, window from
apache_beam. transforms. periodicsequence import PeriodicImpulse from apache_beam.
utils. timestamp import
>
> Is this what you are looking for?
>
>
>
> import random
> import time
>
> import apache_beam as beam
> from apache_beam.transforms import trigger, window
> from apache_beam.transforms.periodicsequence import PeriodicImpulse
> from apache_beam.utils.timestamp import Timestamp
>
> with beam.Pipeline() as p:
> input = (
> p
> | PeriodicImpulse(
> start_timestamp=time.time(),
> stop_timestamp=time.time() + 16,
> fire_interval=1,
> apply_windowing=False,
> )
> | beam.Map(lambda x: random.random())
> | beam.WindowInto(window.FixedWindows(4))
> | beam.GroupBy()
> | "Print Windows"
> >> beam.transforms.util.LogElements(with_timestamp=True,
with_window=True)
> )
>
>
>
> On Fri, Mar 8, 2024 at 6:48 AM Puertos tavares, Jose J (Canada) via user
<user@beam.apache.org> wrote:
>
> Hello Beam Users!
>
>
>
> I was looking into a simple example in Python to have an unbound (--streaming
flag ) pipeline that generated random numbers , applied a Fixed Window (let’s say
5 seconds) and then applies a group by operation ( reshuffle) and print the result
just to check.
>
>
>
> I notice that this seems to work as long as there is no grouping operation
(reshuffle, groupBy ,etc. ) that would leverage the windowing semantics.
>
>
>
> #Get Parameters from Command Line for the Pipeline
>
> known_args, pipeline_options = parser.parse_known_args(argv)
>
> pipeline_options = PipelineOptions(flags=argv)
>
>
>
> #Create pipeline
>
> p = beam.Pipeline(options=pipeline_options)
>
>
>
>
>
> #Execute Pipeline
>
> (p | "Start pipeline " >> beam.Create([0])
>
> | "Get values" >> beam.ParDo(RandomNumberGenerator())
>
> | 'Applied fixed windows ' >> beam.WindowInto( window.FixedWindows(1*5) )
>
> | 'Reshuffle ' >> beam.Reshuffle()
>
> | "Print" >> beam.Map(lambda x: print ("{} - {} ".format(os.getpid(), x)
,flush=True ) )
>
> )
>
>
>
> result = p.run()
>
> result.wait_until_finish()
>
>
>
>
>
> Even thought the Random Generator is unbound and tagged as so with the
decorator, it seems to stuck, if I make that step finite (i.e. adding a counter
and exiting) then the code works in regular batch mode.
>
>
>
> #
=============================================================================
>
> # Class for Splittable Do Random Generatered numbers
>
> #
=============================================================================
>
>
>
> @beam.transforms.core.DoFn.unbounded_per_element()
>
> class RandomNumberGenerator(beam.DoFn):
>
>
>
> @beam.transforms.core.DoFn.unbounded_per_element()
>
> def process(self, element ):
>
> import random
>
> import time
>
>
>
> counter=0
>
>
>
>
>
> while True:
>
>
>
> #if counter>5:
>
> # break
>
> nmb = random.randint(0, 1000)
>
> wait = random.randint(0, 5)
>
> rnow = time.time()
>
>
>
>
>
> print("Randy random", nmb)
>
>
>
> yield beam.window.TimestampedValue(nmb, rnow)
>
> time.sleep(wait)
>
> counter+=1
>
>
>
> I have tried to implement as per documentation the tracker and watermark, but
it seems that none of that seems to work either for the DirectRunner or
FlinkRunner (even there where reshuffle is not a custom operation but a vertex
between the different ParDos). It seems to just stuck.
>
>
>
> I event tried using the native PeriodicImpusle [beam.apache.org] as to factor
out any of my implementation on it, however I still got the same result of it
being ‘stuck’ on the GroupBy/Reshuffle operation.
>
>
>
> In the past I have created with the Java SDK a Unbound Source (now obsoleted
it seems according to doc) streaming pipelines, however I noticed that most of
the unbound python readers like Kakfa [beam.apache.org] and PubSub
[beam.apache.org] use ExternalTransforms behind the scenes so I am starting to
wonder if such unbound sources are supported at all natively in Python.
>
>
>
> I have done some Internet search and even tried LLMs to get to have a
suggestion but I don’t seem to be successful in getting a clear answer on how to
achieve this in Python or if this is even possible and after spending a couple
days I figure I could ask the beam team and hear your thoughts about it and if you
can reference me to any sample that might work so I can analyze it forward to
understand what is missing would be greatly appreciated.
>
>
>
>
>
>
>
> Regards,
>
> JP – A fellow Apache Beam enthusiast
>
>
>
> ________________________________
>
>
> The information in this Internet Email is confidential and may be legally
privileged. It is intended solely for the addressee. Access to this Email by
anyone else is unauthorized. If you are not the intended recipient, any
disclosure, copying, distribution or any action taken or omitted to be taken in
reliance on it, is prohibited and may be unlawful. When addressed to our clients
any opinions or advice contained in this Email are subject to the terms and
conditions expressed in any applicable governing The Home Depot terms of business
or client engagement letter. The Home Depot disclaims all responsibility and
liability for the accuracy and content of this attachment and for any damages or
losses arising from any inaccuracies, errors, viruses, e.g., worms, trojan horses,
etc., or other items of a destructive nature, which may be contained in this
attachment and shall not be liable for direct, indirect, consequential or special
damages in connection with this e-mail message or its attachment.
>
> INTERNAL USE
>
>
> ________________________________
>
> The information in this Internet Email is confidential and may be legally
privileged. It is intended solely for the addressee. Access to this Email by
anyone else is unauthorized. If you are not the intended recipient, any
disclosure, copying, distribution or any action taken or omitted to be taken in
reliance on it, is prohibited and may be unlawful. When addressed to our clients
any opinions or advice contained in this Email are subject to the terms and
conditions expressed in any applicable governing The Home Depot terms of business
or client engagement letter. The Home Depot disclaims all responsibility and
liability for the accuracy and content of this attachment and for any damages or
losses arising from any inaccuracies, errors, viruses, e.g., worms, trojan horses,
etc., or other items of a destructive nature, which may be contained in this
attachment and shall not be liable for direct, indirect, consequential or special
damages in connection with this e-mail message or its attachment.
------------------------------------------------------------------------
The information in this Internet Email is confidential and may be
legally privileged. It is intended solely for the addressee. Access to
this Email by anyone else is unauthorized. If you are not the intended
recipient, any disclosure, copying, distribution or any action taken
or omitted to be taken in reliance on it, is prohibited and may be
unlawful. When addressed to our clients any opinions or advice
contained in this Email are subject to the terms and conditions
expressed in any applicable governing The Home Depot terms of business
or client engagement letter. The Home Depot disclaims all
responsibility and liability for the accuracy and content of this
attachment and for any damages or losses arising from any
inaccuracies, errors, viruses, e.g., worms, trojan horses, etc., or
other items of a destructive nature, which may be contained in this
attachment and shall not be liable for direct, indirect, consequential
or special damages in connection with this e-mail message or its
attachment.
------------------------------------------------------------------------
The information in this Internet Email is confidential and may be
legally privileged. It is intended solely for the addressee. Access to
this Email by anyone else is unauthorized. If you are not the intended
recipient, any disclosure, copying, distribution or any action taken
or omitted to be taken in reliance on it, is prohibited and may be
unlawful. When addressed to our clients any opinions or advice
contained in this Email are subject to the terms and conditions
expressed in any applicable governing The Home Depot terms of business
or client engagement letter. The Home Depot disclaims all
responsibility and liability for the accuracy and content of this
attachment and for any damages or losses arising from any
inaccuracies, errors, viruses, e.g., worms, trojan horses, etc., or
other items of a destructive nature, which may be contained in this
attachment and shall not be liable for direct, indirect, consequential
or special damages in connection with this e-mail message or its
attachment.