Re: [Question] Python Streaming Pipeline Support

2024-03-08 Thread Wiśniowski Piotr
collections.. I’ll keep 
investigating and for the time being the approach we’ll take is micro 
batching , just wanted to close the loop and thank the team for your 
kind responses


Regards,

JP

*
*

*

INTERNAL USE

From:*Puertos tavares, Jose J (Canada) via user 
*Sent:* Friday, March 8, 2024 7:28 PM
*To:* Robert Bradshaw ; user@beam.apache.org
*Cc:* Puertos tavares, Jose J (Canada) 


*Subject:* [EXTERNAL] Re: [Question] Python Streaming Pipeline Support

Hello Robert: Thanks for your answer, however as I posted at the 
begging tested it with Flink as well. There's more interesting as 
Reshuffle is a native (balance) operation but still doesn't seem to 
progress with streaming. Where you able to


Hello Robert:

Thanks  for your  answer, however  as I posted at the begging  tested  
it with Flink as well.


There's  more interesting  as Reshuffle is a native (balance) 
operation but still doesn't  seem to progress with streaming.


Where you able  to run int successfully with the expected  behavior? 
Here running  latest 2.54.0 on Windows.


Regards

JP



*From:*Robert Bradshaw 
*Sent:* Friday, March 8, 2024 6:49 PM
*To:* user@beam.apache.org 
*Cc:* XQ Hu ; Puertos tavares, Jose J (Canada) 


*Subject:* [EXTERNAL] Re: [Question] Python Streaming Pipeline Support

The Python Local Runner has limited support for streaming pipelines. 
For the time being would recommend using Dataflow or Flink (the latter 
can be run locally) to try out streaming pipelines. On Fri, Mar 8, 
2024 at 2: 11 PM Puertos tavares,


The Python Local Runner has limited support for streaming pipelines.
For the time being would recommend using Dataflow or Flink (the latter
can be run locally) to try out streaming pipelines.
On Fri, Mar 8, 2024 at 2:11 PM Puertos tavares, Jose J (Canada) via
user  wrote:
>
> Hello  Hu:
>
>
>
> Not really.  This one as you have coded it finishes as per   
stop_timestamp=time.time() + 16 and after it finish emitting then everything else 
gets output and the pipeline in batch mode terminates.
>
>
>
> You can rule out STDOUT issues and  confirm this behavior as putting a ParDo  
with something that would throw an exception after the GroupBy  or write temporary 
files/make HTTP requests. This ParDO won’t be executed until your PeriodImpulse 
terminates (you can extend it to +60  and see this is not being trigger on your 4 
second window, but until it stops generating)
>
>
>
> I am looking for something that is really streaming and executes constantly 
and that in this case , every 4 seconds the window would process the elements in 
the window and wait for the next window to accumulate.
>
>
>
> Regards,
>
> JP
>
>
>
>
>
>
>
>
>
>
> INTERNAL USE
>
> From: XQ Hu 
> Sent: Friday, March 8, 2024 3:51 PM
> To: user@beam.apache.org
> Cc: Puertos tavares, Jose J (Canada) 
> Subject: [EXTERNAL] Re: [Question] Python Streaming Pipeline Support
>
>
>
> Is this what you are looking for? import random import time import 
apache_beam as beam from apache_beam. transforms import trigger, window from 
apache_beam. transforms. periodicsequence import PeriodicImpulse from apache_beam. 
utils. timestamp import
>
> Is this what you are looking for?
>
>
>
> import random
> import time
>
> import apache_beam as beam
> from apache_beam.transforms import trigger, window
> from apache_beam.transforms.periodicsequence import PeriodicImpulse
> from apache_beam.utils.timestamp import Timestamp
>
> with beam.Pipeline() as p:
> input = (
> p
> | PeriodicImpulse(
> start_timestamp=time.time(),
> stop_timestamp=time.time() + 16,
> fire_interval=1,
> apply_windowing=False,
> )
> | beam.Map(lambda x: random.random())
> | beam.WindowInto(window.FixedWindows(4))
> | beam.GroupBy()
> | "Print Windows"
> >> beam.transforms.util.LogElements(with_timestamp=True, 
with_window=True)
> )
>
>
>
> On Fri, Mar 8, 2024 at 6:48 AM Puertos tavares, Jose J (Canada) via user 
 wrote:
>
> Hello Beam Users!
>
>
>
> I was looking into a simple example in Python to have an unbound (--streaming 
flag ) pipeline that generated random numbers , applied a Fixed Window (let’s say 
5 seconds) and then applies a group by operation ( reshuffle) and print the result 
just to check.
>
>
>
> I notice that this seems to work as long as there is no grouping operation 
(reshuffle, groupBy ,etc. ) that would leverage the windowing semantics.
>
>
>
> #Get Parameters from Command Line for the Pipeline
>
> known_args, pipeline_options = parser.parse_known_args(argv)
>
>

RE: [Question] Python Streaming Pipeline Support

2024-03-08 Thread Puertos tavares, Jose J (Canada) via user

Hello group:

I believe it might be interesting to show what I have found so found with you 
feedback as I  have corroborated that the Direct Runners and Flink Runner DO  
work on streaming, but it seems more of a constraint on the definition of the 
PCollection rather than the operators, as show in  my code  
https://play.beam.apache.org/?sdk=python&shared=TJNavCeJ_DS


Based on the fact that most samples leverage a messaging system as the 
streaming source I decided to use the Google Cloud PubSub 
emulator<https://cloud.google.com/pubsub/docs/emulator> to   have a setup where 
I push a message to the topic at the beginning, and create a pipeline that 
consumes the message from a subscription,  applies the windowing, applies the 
group by operation and at the end it pushes again the message hence providing 
the forever loop of consumption for streaming


gcloud beta emulators pubsub start --project=beam-sample

Executing: cmd /c …. cloud-pubsub-emulator.bat --host=localhost --port=8085
[pubsub] This is the Google Pub/Sub fake.
[pubsub] Implementation may be incomplete or differ from the real system.
[pubsub] Mar 08, 2024 7:46:19 PM com.google.cloud.pubsub.testing.v1.Main main
[pubsub] INFO: IAM integration is disabled. IAM policy methods and ACL checks 
are not supported
[pubsub] SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
[pubsub] SLF4J: Defaulting to no-operation (NOP) logger implementation
[pubsub] SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for 
further details.
[pubsub] Mar 08, 2024 7:46:21 PM com.google.cloud.pubsub.testing.v1.Main main
[pubsub] INFO: Server started, listening on 8085


In another terminal then run the setup and the pipeline and run the using 
emulator 
documentation<https://cloud.google.com/pubsub/docs/emulator#using_the_emulator> 
to create the fake topic and subscription using the 
publisher.py<https://raw.githubusercontent.com/googleapis/python-pubsub/main/samples/snippets/publisher.py>
  and 
subscriber.py<https://raw.githubusercontent.com/googleapis/python-pubsub/main/samples/snippets/subscriber.py>
 scripts

set PUBSUB_EMULATOR_HOST=localhost:8085
set PUBSUB_PROJECT_ID=beam-sample

publisher.py beam-sample create topic
Created topic: projects/beam-sample/topics/topic

subscriber.py beam-sample create topic subscription
Subscription created: name: "projects/beam-sample/subscriptions/subscription"
topic: "projects/beam-sample/topics/topic"
push_config {
}
ack_deadline_seconds: 10
message_retention_duration {
  seconds: 604800
}


publisher.py beam-sample publish topic
1
2
3
4
5
6
7
8
9
Published messages to projects/beam-sample/topics/topic.




Direct Runner looped in streaming as expected (although in my system it wasn’t 
every 10 seconds)
eternal_pubsub.py --streaming  true
Starting pipeline...
Message number 1
Message number 2
Message number 3
Message number 4
Message number 5
Message number 6
Message number 7
Message number 8
Message number 9
Messages from PubSub :9
Messages from PubSub :1
Messages from PubSub :1
Messages from PubSub :1

…


As per this 
post<https://stackoverflow.com/questions/68342095/error-while-running-beam-streaming-pipeline-python-with-pub-sub-io-in-embedded>
 FlinkRunner doesn’t support the PubSub operator but then I guess Kafka or 
other existing Unbound PCollection generator would work, and as I mentioned on 
my first post the ones that I have created are with the “old I/O Java Source”.

To summarize it seems then more than the support of  Group By Operations  is 
more towards the Unbounded collections..  I’ll keep investigating and for the 
time being the approach we’ll take is micro batching , just wanted to close the 
loop and thank the team for your kind responses

Regards,
JP



INTERNAL USE

From: Puertos tavares, Jose J (Canada) via user 
Sent: Friday, March 8, 2024 7:28 PM
To: Robert Bradshaw ; user@beam.apache.org
Cc: Puertos tavares, Jose J (Canada) 
Subject: [EXTERNAL] Re: [Question] Python Streaming Pipeline Support

Hello Robert: Thanks for your answer, however as I posted at the begging tested 
it with Flink as well. There's more interesting as Reshuffle is a native 
(balance) operation but still doesn't seem to progress with streaming. Where 
you able to

Hello Robert:

Thanks  for your  answer, however  as I posted at the begging  tested  it with 
Flink as well.
There's  more interesting  as Reshuffle is a native (balance) operation but 
still doesn't  seem to progress with streaming.
Where you able  to run int successfully  with the expected  behavior? Here 
running  latest 2.54.0 on  Windows.

Regards
JP


From: Robert Bradshaw mailto:rober...@google.com>>
Sent: Friday, March 8, 2024 6:49 PM
To: user@beam.apache.org<mailto:user@beam.apache.org> 
mailto:user@beam.apache.org>>
Cc: XQ Hu mailto:x...@google.com>>; Puertos tavares, Jose J 
(Canada) 
mailto

Re: [Question] Python Streaming Pipeline Support

2024-03-08 Thread Puertos tavares, Jose J (Canada) via user
Hello Robert:

Thanks  for your  answer, however  as I posted at the begging  tested  it with 
Flink as well.
There's  more interesting  as Reshuffle is a native (balance) operation but 
still doesn't  seem to progress with streaming.
Where you able  to run int successfully  with the expected  behavior? Here 
running  latest 2.54.0 on  Windows.

Regards
JP


From: Robert Bradshaw 
Sent: Friday, March 8, 2024 6:49 PM
To: user@beam.apache.org 
Cc: XQ Hu ; Puertos tavares, Jose J (Canada) 

Subject: [EXTERNAL] Re: [Question] Python Streaming Pipeline Support

The Python Local Runner has limited support for streaming pipelines. For the 
time being would recommend using Dataflow or Flink (the latter can be run 
locally) to try out streaming pipelines. On Fri, Mar 8, 2024 at 2: 11 PM 
Puertos tavares,


The Python Local Runner has limited support for streaming pipelines.
For the time being would recommend using Dataflow or Flink (the latter
can be run locally) to try out streaming pipelines.

On Fri, Mar 8, 2024 at 2:11 PM Puertos tavares, Jose J (Canada) via
user  wrote:
>
> Hello  Hu:
>
>
>
> Not really.  This one as you have coded it finishes as per   
> stop_timestamp=time.time() + 16 and after it finish emitting then everything 
> else gets output and the pipeline in batch mode terminates.
>
>
>
> You can rule out STDOUT issues and  confirm this behavior as putting a ParDo  
> with something that would throw an exception after the GroupBy  or write 
> temporary files/make HTTP requests. This ParDO won’t be executed until your 
> PeriodImpulse terminates (you can extend it to +60  and see this is not being 
> trigger on your 4 second window, but until it stops generating)
>
>
>
> I am looking for something that is really streaming and executes constantly 
> and that in this case , every 4 seconds the window would process the elements 
> in the window and wait for the next window to accumulate.
>
>
>
> Regards,
>
> JP
>
>
>
>
>
>
>
>
>
>
> INTERNAL USE
>
> From: XQ Hu 
> Sent: Friday, March 8, 2024 3:51 PM
> To: user@beam.apache.org
> Cc: Puertos tavares, Jose J (Canada) 
> Subject: [EXTERNAL] Re: [Question] Python Streaming Pipeline Support
>
>
>
> Is this what you are looking for? import random import time import 
> apache_beam as beam from apache_beam. transforms import trigger, window from 
> apache_beam. transforms. periodicsequence import PeriodicImpulse from 
> apache_beam. utils. timestamp import
>
> Is this what you are looking for?
>
>
>
> import random
> import time
>
> import apache_beam as beam
> from apache_beam.transforms import trigger, window
> from apache_beam.transforms.periodicsequence import PeriodicImpulse
> from apache_beam.utils.timestamp import Timestamp
>
> with beam.Pipeline() as p:
> input = (
> p
> | PeriodicImpulse(
> start_timestamp=time.time(),
> stop_timestamp=time.time() + 16,
> fire_interval=1,
> apply_windowing=False,
> )
> | beam.Map(lambda x: random.random())
> | beam.WindowInto(window.FixedWindows(4))
> | beam.GroupBy()
> | "Print Windows"
> >> beam.transforms.util.LogElements(with_timestamp=True, 
> with_window=True)
> )
>
>
>
> On Fri, Mar 8, 2024 at 6:48 AM Puertos tavares, Jose J (Canada) via user 
>  wrote:
>
> Hello Beam Users!
>
>
>
> I was looking into a simple example in Python to have an unbound (--streaming 
> flag ) pipeline that generated random numbers , applied a Fixed Window (let’s 
> say 5 seconds) and then applies a group by operation ( reshuffle) and print 
> the result just to check.
>
>
>
> I notice that this seems to work as long as there is no grouping operation 
> (reshuffle, groupBy ,etc. ) that would leverage the windowing semantics.
>
>
>
> #Get Parameters from Command Line for the Pipeline
>
> known_args, pipeline_options = parser.parse_known_args(argv)
>
> pipeline_options = PipelineOptions(flags=argv)
>
>
>
> #Create pipeline
>
> p = beam.Pipeline(options=pipeline_options)
>
>
>
>
>
> #Execute Pipeline
>
> (p | "Start pipeline " >> beam.Create([0])
>
> | "Get values"  >> beam.ParDo(RandomNumberGenerator())
>
> | 'Applied fixed windows ' >> beam.WindowInto( window.FixedWindows(1*5) )
>
> | 'Reshuffle ' >> beam.Reshuffle()
>
> |  "Print" >> beam.Map(lambda x: print ("

Re: [Question] Python Streaming Pipeline Support

2024-03-08 Thread Robert Bradshaw via user
The Python Local Runner has limited support for streaming pipelines.
For the time being would recommend using Dataflow or Flink (the latter
can be run locally) to try out streaming pipelines.

On Fri, Mar 8, 2024 at 2:11 PM Puertos tavares, Jose J (Canada) via
user  wrote:
>
> Hello  Hu:
>
>
>
> Not really.  This one as you have coded it finishes as per   
> stop_timestamp=time.time() + 16 and after it finish emitting then everything 
> else gets output and the pipeline in batch mode terminates.
>
>
>
> You can rule out STDOUT issues and  confirm this behavior as putting a ParDo  
> with something that would throw an exception after the GroupBy  or write 
> temporary files/make HTTP requests. This ParDO won’t be executed until your 
> PeriodImpulse terminates (you can extend it to +60  and see this is not being 
> trigger on your 4 second window, but until it stops generating)
>
>
>
> I am looking for something that is really streaming and executes constantly 
> and that in this case , every 4 seconds the window would process the elements 
> in the window and wait for the next window to accumulate.
>
>
>
> Regards,
>
> JP
>
>
>
>
>
>
>
>
>
>
> INTERNAL USE
>
> From: XQ Hu 
> Sent: Friday, March 8, 2024 3:51 PM
> To: user@beam.apache.org
> Cc: Puertos tavares, Jose J (Canada) 
> Subject: [EXTERNAL] Re: [Question] Python Streaming Pipeline Support
>
>
>
> Is this what you are looking for? import random import time import 
> apache_beam as beam from apache_beam. transforms import trigger, window from 
> apache_beam. transforms. periodicsequence import PeriodicImpulse from 
> apache_beam. utils. timestamp import
>
> Is this what you are looking for?
>
>
>
> import random
> import time
>
> import apache_beam as beam
> from apache_beam.transforms import trigger, window
> from apache_beam.transforms.periodicsequence import PeriodicImpulse
> from apache_beam.utils.timestamp import Timestamp
>
> with beam.Pipeline() as p:
> input = (
> p
> | PeriodicImpulse(
> start_timestamp=time.time(),
> stop_timestamp=time.time() + 16,
> fire_interval=1,
> apply_windowing=False,
> )
> | beam.Map(lambda x: random.random())
> | beam.WindowInto(window.FixedWindows(4))
> | beam.GroupBy()
> | "Print Windows"
> >> beam.transforms.util.LogElements(with_timestamp=True, 
> with_window=True)
> )
>
>
>
> On Fri, Mar 8, 2024 at 6:48 AM Puertos tavares, Jose J (Canada) via user 
>  wrote:
>
> Hello Beam Users!
>
>
>
> I was looking into a simple example in Python to have an unbound (--streaming 
> flag ) pipeline that generated random numbers , applied a Fixed Window (let’s 
> say 5 seconds) and then applies a group by operation ( reshuffle) and print 
> the result just to check.
>
>
>
> I notice that this seems to work as long as there is no grouping operation 
> (reshuffle, groupBy ,etc. ) that would leverage the windowing semantics.
>
>
>
> #Get Parameters from Command Line for the Pipeline
>
> known_args, pipeline_options = parser.parse_known_args(argv)
>
> pipeline_options = PipelineOptions(flags=argv)
>
>
>
> #Create pipeline
>
> p = beam.Pipeline(options=pipeline_options)
>
>
>
>
>
> #Execute Pipeline
>
> (p | "Start pipeline " >> beam.Create([0])
>
> | "Get values"  >> beam.ParDo(RandomNumberGenerator())
>
> | 'Applied fixed windows ' >> beam.WindowInto( window.FixedWindows(1*5) )
>
> | 'Reshuffle ' >> beam.Reshuffle()
>
> |  "Print" >> beam.Map(lambda x: print ("{} - {} ".format(os.getpid(), x) 
> ,flush=True ) )
>
> )
>
>
>
> result = p.run()
>
> result.wait_until_finish()
>
>
>
>
>
> Even thought the  Random Generator is unbound and tagged as so with the 
> decorator, it seems to stuck, if I make that step finite (i.e. adding a 
> counter and exiting) then the code works in regular batch mode.
>
>
>
> # 
> =
>
> # Class for Splittable Do  Random Generatered numbers
>
> # 
> =
>
>
>
> @beam.transforms.core.DoFn.unbounded_per_element()
>
> class RandomNumberGenerator(beam.DoFn):
>
>
>
> @beam.transforms.core.DoFn.unbounded_per_element()
>
> def process(self, element ):
>
> i

RE: [Question] Python Streaming Pipeline Support

2024-03-08 Thread Puertos tavares, Jose J (Canada) via user
Hello  Hu:

Not really.  This one as you have coded it finishes as per   
stop_timestamp=time.time() + 16 and after it finish emitting then everything 
else gets output and the pipeline in batch mode terminates.

You can rule out STDOUT issues and  confirm this behavior as putting a ParDo  
with something that would throw an exception after the GroupBy  or write 
temporary files/make HTTP requests. This ParDO won’t be executed until your 
PeriodImpulse terminates (you can extend it to +60  and see this is not being 
trigger on your 4 second window, but until it stops generating)

I am looking for something that is really streaming and executes constantly and 
that in this case , every 4 seconds the window would process the elements in 
the window and wait for the next window to accumulate.

Regards,
JP






INTERNAL USE

From: XQ Hu 
Sent: Friday, March 8, 2024 3:51 PM
To: user@beam.apache.org
Cc: Puertos tavares, Jose J (Canada) 
Subject: [EXTERNAL] Re: [Question] Python Streaming Pipeline Support

Is this what you are looking for? import random import time import apache_beam 
as beam from apache_beam. transforms import trigger, window from apache_beam. 
transforms. periodicsequence import PeriodicImpulse from apache_beam. utils. 
timestamp import

Is this what you are looking for?

import random
import time

import apache_beam as beam
from apache_beam.transforms import trigger, window
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.utils.timestamp import Timestamp

with beam.Pipeline() as p:
input = (
p
| PeriodicImpulse(
start_timestamp=time.time(),
stop_timestamp=time.time() + 16,
fire_interval=1,
apply_windowing=False,
)
| beam.Map(lambda x: random.random())
| beam.WindowInto(window.FixedWindows(4))
| beam.GroupBy()
| "Print Windows"
>> beam.transforms.util.LogElements(with_timestamp=True, 
with_window=True)
)

On Fri, Mar 8, 2024 at 6:48 AM Puertos tavares, Jose J (Canada) via user 
mailto:user@beam.apache.org>> wrote:
Hello Beam Users!

I was looking into a simple example in Python to have an unbound (--streaming 
flag ) pipeline that generated random numbers , applied a Fixed Window (let’s 
say 5 seconds) and then applies a group by operation ( reshuffle) and print the 
result just to check.

I notice that this seems to work as long as there is no grouping operation 
(reshuffle, groupBy ,etc. ) that would leverage the windowing semantics.

#Get Parameters from Command Line for the Pipeline
known_args, pipeline_options = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(flags=argv)

#Create pipeline
p = beam.Pipeline(options=pipeline_options)


#Execute Pipeline
(p | "Start pipeline " >> beam.Create([0])
| "Get values"  >> beam.ParDo(RandomNumberGenerator())
| 'Applied fixed windows ' >> beam.WindowInto( window.FixedWindows(1*5) )
| 'Reshuffle ' >> beam.Reshuffle()
|  "Print" >> beam.Map(lambda x: print ("{} - {} ".format(os.getpid(), x) 
,flush=True ) )
)

result = p.run()
result.wait_until_finish()


Even thought the  Random Generator is unbound and tagged as so with the 
decorator, it seems to stuck, if I make that step finite (i.e. adding a counter 
and exiting) then the code works in regular batch mode.

# =
# Class for Splittable Do  Random Generatered numbers
# =

@beam.transforms.core.DoFn.unbounded_per_element()
class RandomNumberGenerator(beam.DoFn):

@beam.transforms.core.DoFn.unbounded_per_element()
def process(self, element ):
import random
import time

counter=0


while True:

#if counter>5:
#break
nmb = random.randint(0, 1000)
wait = random.randint(0, 5)
rnow = time.time()


print("Randy random", nmb)

yield beam.window.TimestampedValue(nmb, rnow)
time.sleep(wait)
counter+=1

I have tried to implement as per documentation the tracker and watermark, but 
it seems that none of that seems to work either for the DirectRunner or 
FlinkRunner  (even there where reshuffle is not a custom operation but a vertex 
between the different ParDos). It seems to just stuck.

I event tried using the native PeriodicImpusle 
[beam.apache.org]<https://urldefense.com/v3/__https:/beam.apache.org/releases/pydoc/2.30.0/apache_beam.transforms.periodicsequence.html?highlight=impulse*apache_beam.transforms.periodicsequence.PeriodicImpulse__;Iw!!M-nmYVHPHQ!JWXcfVTEoDTXyjLaJIBaD3FkvA7icbAdENphN_6DIxBwSIhLbYCIFzol0dZj9nVmf69cw6abdUq06NjDy2HqbxRWdE4$>
 as to fac

Re: [Question] Python Streaming Pipeline Support

2024-03-08 Thread XQ Hu via user
Is this what you are looking for?

import random
import time

import apache_beam as beam
from apache_beam.transforms import trigger, window
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.utils.timestamp import Timestamp

with beam.Pipeline() as p:
input = (
p
| PeriodicImpulse(
start_timestamp=time.time(),
stop_timestamp=time.time() + 16,
fire_interval=1,
apply_windowing=False,
)
| beam.Map(lambda x: random.random())
| beam.WindowInto(window.FixedWindows(4))
| beam.GroupBy()
| "Print Windows"
>> beam.transforms.util.LogElements(with_timestamp=True,
with_window=True)
)

On Fri, Mar 8, 2024 at 6:48 AM Puertos tavares, Jose J (Canada) via user <
user@beam.apache.org> wrote:

> *Hello Beam Users!*
>
>
>
> I was looking into a simple example in Python to have an unbound
> (--streaming flag ) pipeline that generated random numbers , applied a
> Fixed Window (let’s say 5 seconds) and then applies a group by operation (
> reshuffle) and print the result just to check.
>
>
>
> I notice that this seems to work as long as there is no grouping operation
> (reshuffle, groupBy ,etc. ) that would leverage the windowing semantics.
>
>
>
> *#Get Parameters from Command Line for the Pipeline*
>
> known_args, pipeline_options = parser.parse_known_args(argv)
>
> pipeline_options = PipelineOptions(flags=argv)
>
>
>
> *#Create pipeline*
>
> p = beam.Pipeline(options=pipeline_options)
>
>
>
>
>
> *#Execute Pipeline*
>
> (p | "Start pipeline " >> beam.Create([0])
>
> | "Get values"  >> beam.ParDo(RandomNumberGenerator())
>
> | 'Applied fixed windows ' >> beam.WindowInto(
> window.FixedWindows(1*5) )
>
> | 'Reshuffle ' >> beam.Reshuffle()
>
> |  "Print" >> beam.Map(lambda x: print ("{} - {} ".format(os.getpid(),
> x) ,flush=True ) )
>
> )
>
>
>
> result = p.run()
>
> result.wait_until_finish()
>
>
>
>
>
> Even thought the  Random Generator is unbound and tagged as so with the
> decorator, it seems to stuck, if I make that step finite (i.e. adding a
> counter and exiting) then the code works in regular batch mode.
>
>
>
> #
> =
>
> # Class for Splittable Do  Random Generatered numbers
>
> #
> =
>
>
>
> @beam.transforms.core.DoFn.unbounded_per_element()
>
> class RandomNumberGenerator(beam.DoFn):
>
>
>
> @beam.transforms.core.DoFn.unbounded_per_element()
>
> def process(self, element ):
>
> import random
>
> import time
>
>
>
> counter=0
>
>
>
>
>
> while True:
>
>
>
> #if counter>5:
>
> #break
>
> nmb = random.randint(0, 1000)
>
> wait = random.randint(0, 5)
>
> rnow = time.time()
>
>
>
>
>
> print("Randy random", nmb)
>
>
>
> yield beam.window.TimestampedValue(nmb, rnow)
>
> time.sleep(wait)
>
> counter+=1
>
>
>
> I have tried to implement as per documentation the tracker and watermark,
> but it seems that none of that seems to work either for the *DirectRunner
> or FlinkRunner*  (even there where reshuffle is not a custom operation
> but a vertex between the different ParDos). It seems to just stuck.
>
>
>
> I event tried using the native PeriodicImpusle
> 
> as to factor out any of my implementation on it, however I still got the
> same result of it being ‘stuck’ on the GroupBy/Reshuffle operation.
>
>
>
> In the past I have created with the Java SDK a Unbound Source (now
> obsoleted it seems according to doc)   streaming pipelines, however I
> noticed that  most of the unbound python readers like Kakfa
> 
>  and PubSub
> 
> use ExternalTransforms behind the scenes so I am starting to wonder if such
> unbound sources are supported at all natively in Python.
>
>
>
> I have done some Internet search and even tried  LLMs to get to have a
> suggestion but I don’t seem to be successful in getting a clear answer on
> how to achieve this in Python or if this is even possible and after
> spending a couple days I figure I could ask the beam team and hear your
> thoughts about it and if you can reference me to any sample that might work
> so I can analyze it forward to understand what is missing would be greatly
> appreciated.
>
>
>
>
>
>
>
> Regards,
>
> *JP – A fellow Apache Beam enthusiast *
>
> --
>
> The information in this Internet

[Question] Python Streaming Pipeline Support

2024-03-08 Thread Puertos tavares, Jose J (Canada) via user
Hello Beam Users!

I was looking into a simple example in Python to have an unbound (--streaming 
flag ) pipeline that generated random numbers , applied a Fixed Window (let's 
say 5 seconds) and then applies a group by operation ( reshuffle) and print the 
result just to check.

I notice that this seems to work as long as there is no grouping operation 
(reshuffle, groupBy ,etc. ) that would leverage the windowing semantics.

#Get Parameters from Command Line for the Pipeline
known_args, pipeline_options = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(flags=argv)

#Create pipeline
p = beam.Pipeline(options=pipeline_options)


#Execute Pipeline
(p | "Start pipeline " >> beam.Create([0])
| "Get values"  >> beam.ParDo(RandomNumberGenerator())
| 'Applied fixed windows ' >> beam.WindowInto( window.FixedWindows(1*5) )
| 'Reshuffle ' >> beam.Reshuffle()
|  "Print" >> beam.Map(lambda x: print ("{} - {} ".format(os.getpid(), x) 
,flush=True ) )
)

result = p.run()
result.wait_until_finish()


Even thought the  Random Generator is unbound and tagged as so with the 
decorator, it seems to stuck, if I make that step finite (i.e. adding a counter 
and exiting) then the code works in regular batch mode.

# =
# Class for Splittable Do  Random Generatered numbers
# =

@beam.transforms.core.DoFn.unbounded_per_element()
class RandomNumberGenerator(beam.DoFn):

@beam.transforms.core.DoFn.unbounded_per_element()
def process(self, element ):
import random
import time

counter=0


while True:

#if counter>5:
#break
nmb = random.randint(0, 1000)
wait = random.randint(0, 5)
rnow = time.time()


print("Randy random", nmb)

yield beam.window.TimestampedValue(nmb, rnow)
time.sleep(wait)
counter+=1

I have tried to implement as per documentation the tracker and watermark, but 
it seems that none of that seems to work either for the DirectRunner or 
FlinkRunner  (even there where reshuffle is not a custom operation but a vertex 
between the different ParDos). It seems to just stuck.

I event tried using the native 
PeriodicImpusle
 as to factor out any of my implementation on it, however I still got the same 
result of it being 'stuck' on the GroupBy/Reshuffle operation.

In the past I have created with the Java SDK a Unbound Source (now obsoleted it 
seems according to doc)   streaming pipelines, however I noticed that  most of 
the unbound python readers like 
Kakfa
  and 
PubSub
 use ExternalTransforms behind the scenes so I am starting to wonder if such 
unbound sources are supported at all natively in Python.

I have done some Internet search and even tried  LLMs to get to have a 
suggestion but I don't seem to be successful in getting a clear answer on how 
to achieve this in Python or if this is even possible and after spending a 
couple days I figure I could ask the beam team and hear your thoughts about it 
and if you can reference me to any sample that might work so I can analyze it 
forward to understand what is missing would be greatly appreciated.



Regards,
JP - A fellow Apache Beam enthusiast



The information in this Internet Email is confidential and may be legally 
privileged. It is intended solely for the addressee. Access to this Email by 
anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be taken in 
reliance on it, is prohibited and may be unlawful. When addressed to our 
clients any opinions or advice contained in this Email are subject to the terms 
and conditions expressed in any applicable governing The Home Depot terms of 
business or client engagement letter. The Home Depot disclaims all 
responsibility and liability for the accuracy and content of this attachment 
and for any damages or losses arising from any inaccuracies, errors, viruses, 
e.g., worms, trojan horses, etc., or other items of a destructive nature, which 
may be contained in this attachment and shall not be liable for direct, 
indirect, consequential or special damages in connection with this e-mail 
message or its attachment.


INTERNAL USE