Re: [Question] Python Streaming Pipeline Support

2024-03-08 Thread Wiśniowski Piotr

Hi,

There are two more things that I would suggest to try:

1.

PipelineOptions(
beam_args,
streaming=True,
)

The `streaming` flag changes the mode how the runners operate. Not sure 
why, but I found this required to get similar behavior that you want to 
get. It might be required even if You do have unbounded sources in the 
pipeline?


I use this successfully on both direct runner and DataFlow.

2.

fromapache_beam.testing.test_streamimportElementEvent, TestStream, 
WatermarkEvent


Please check out this classes. This is the most elastic and proper way 
to test streaming pipelines. Moreover it allows you to control how 
watermark progresses on source and even maybe processing time triggers 
(but I did not managed to make this one work yet). I use this heavily to 
simulate PubSub and Kafka sources and even intermediate pipeline things 
too. Its kind of poorly documented, as it took me few days to grasp how 
to handle this with multiple input streams, but I do plan to create a 
post with findings.


I think I had similar days long investigation few months ago. Let me 
know if this is helpful.


Best

Wiśniowski Piotr


On 9.03.2024 03:29, Puertos tavares, Jose J (Canada) via user wrote:


**

*Hello group:*

I believe it might be interesting to show what I have found so found 
with you feedback as I  have corroborated that the Direct Runners and 
Flink Runner DO  work on streaming, but it seems more of a constraint 
on the definition of the PCollection rather than the operators, as 
show in  my code 
https://play.beam.apache.org/?sdk=python=TJNavCeJ_DS 



Based on the fact that most samples leverage a messaging system as the 
streaming source I decided to use the Google Cloud PubSub emulator 
 to   have a setup 
where I push a message to the topic at the beginning, and create a 
pipeline that consumes the message from a subscription,  applies the 
windowing, applies the group by operation and at the end it pushes 
again the message hence providing the forever loop of consumption for 
streaming


*gcloud beta emulators pubsub start --project=beam-sample***

Executing: cmd /c …. cloud-pubsub-emulator.bat --host=localhost 
--port=8085


[pubsub] This is the Google Pub/Sub fake.

[pubsub] Implementation may be incomplete or differ from the real system.

[pubsub] Mar 08, 2024 7:46:19 PM 
com.google.cloud.pubsub.testing.v1.Main main


[pubsub] INFO: IAM integration is disabled. IAM policy methods and ACL 
checks are not supported


[pubsub] SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

[pubsub] SLF4J: Defaulting to no-operation (NOP) logger implementation

[pubsub] SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder 
for further details.


[pubsub] Mar 08, 2024 7:46:21 PM 
com.google.cloud.pubsub.testing.v1.Main main


[pubsub] INFO: Server started, listening on 8085

In another terminal then run the setup and the pipeline and run the 
using emulator documentation 
 to 
create the fake topic and subscription using the publisher.py 
 
 and subscriber.py 
 
scripts


*set PUBSUB_EMULATOR_HOST=localhost:8085***

*set PUBSUB_PROJECT_ID=**beam-sample***

*publisher.py beam-sample create topic***

Created topic: projects/beam-sample/topics/topic

*subscriber.py beam-sample create topic subscription***

Subscription created: name: 
"projects/beam-sample/subscriptions/subscription"


topic: "projects/beam-sample/topics/topic"

push_config {

}

ack_deadline_seconds: 10

message_retention_duration {

seconds: 604800

}

*publisher.py beam-sample publish topic***

1

2

3

4

5

6

7

8

9

Published messages to projects/beam-sample/topics/topic.

*Direct Runner*looped in streaming as expected (although in my system 
it wasn’t every 10 seconds)


*eternal_pubsub.py --streaming  true***

Starting pipeline...

Message number 1

Message number 2

Message number 3

Message number 4

Message number 5

Message number 6

Message number 7

Message number 8

Message number 9

Messages from PubSub :9

Messages from PubSub :1

Messages from PubSub :1

Messages from PubSub :1

…

As per this post 
 
FlinkRunner doesn’t support the PubSub operator but then I guess Kafka 
or other existing Unbound PCollection generator would work, and as I 
mentioned on my first post the ones that I have created are with the 
“old I/O Java Source”.


To summarize it seems then more than the support of  Group By 
Operations  is more towards the Unbounded collections.. I’ll keep 
investigating and for the time being the approach we’ll take is 

RE: [Question] Python Streaming Pipeline Support

2024-03-08 Thread Puertos tavares, Jose J (Canada) via user

Hello group:

I believe it might be interesting to show what I have found so found with you 
feedback as I  have corroborated that the Direct Runners and Flink Runner DO  
work on streaming, but it seems more of a constraint on the definition of the 
PCollection rather than the operators, as show in  my code  
https://play.beam.apache.org/?sdk=python=TJNavCeJ_DS


Based on the fact that most samples leverage a messaging system as the 
streaming source I decided to use the Google Cloud PubSub 
emulator to   have a setup where 
I push a message to the topic at the beginning, and create a pipeline that 
consumes the message from a subscription,  applies the windowing, applies the 
group by operation and at the end it pushes again the message hence providing 
the forever loop of consumption for streaming


gcloud beta emulators pubsub start --project=beam-sample

Executing: cmd /c …. cloud-pubsub-emulator.bat --host=localhost --port=8085
[pubsub] This is the Google Pub/Sub fake.
[pubsub] Implementation may be incomplete or differ from the real system.
[pubsub] Mar 08, 2024 7:46:19 PM com.google.cloud.pubsub.testing.v1.Main main
[pubsub] INFO: IAM integration is disabled. IAM policy methods and ACL checks 
are not supported
[pubsub] SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
[pubsub] SLF4J: Defaulting to no-operation (NOP) logger implementation
[pubsub] SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for 
further details.
[pubsub] Mar 08, 2024 7:46:21 PM com.google.cloud.pubsub.testing.v1.Main main
[pubsub] INFO: Server started, listening on 8085


In another terminal then run the setup and the pipeline and run the using 
emulator 
documentation 
to create the fake topic and subscription using the 
publisher.py
  and 
subscriber.py
 scripts

set PUBSUB_EMULATOR_HOST=localhost:8085
set PUBSUB_PROJECT_ID=beam-sample

publisher.py beam-sample create topic
Created topic: projects/beam-sample/topics/topic

subscriber.py beam-sample create topic subscription
Subscription created: name: "projects/beam-sample/subscriptions/subscription"
topic: "projects/beam-sample/topics/topic"
push_config {
}
ack_deadline_seconds: 10
message_retention_duration {
  seconds: 604800
}


publisher.py beam-sample publish topic
1
2
3
4
5
6
7
8
9
Published messages to projects/beam-sample/topics/topic.




Direct Runner looped in streaming as expected (although in my system it wasn’t 
every 10 seconds)
eternal_pubsub.py --streaming  true
Starting pipeline...
Message number 1
Message number 2
Message number 3
Message number 4
Message number 5
Message number 6
Message number 7
Message number 8
Message number 9
Messages from PubSub :9
Messages from PubSub :1
Messages from PubSub :1
Messages from PubSub :1

…


As per this 
post
 FlinkRunner doesn’t support the PubSub operator but then I guess Kafka or 
other existing Unbound PCollection generator would work, and as I mentioned on 
my first post the ones that I have created are with the “old I/O Java Source”.

To summarize it seems then more than the support of  Group By Operations  is 
more towards the Unbounded collections..  I’ll keep investigating and for the 
time being the approach we’ll take is micro batching , just wanted to close the 
loop and thank the team for your kind responses

Regards,
JP



INTERNAL USE

From: Puertos tavares, Jose J (Canada) via user 
Sent: Friday, March 8, 2024 7:28 PM
To: Robert Bradshaw ; user@beam.apache.org
Cc: Puertos tavares, Jose J (Canada) 
Subject: [EXTERNAL] Re: [Question] Python Streaming Pipeline Support

Hello Robert: Thanks for your answer, however as I posted at the begging tested 
it with Flink as well. There's more interesting as Reshuffle is a native 
(balance) operation but still doesn't seem to progress with streaming. Where 
you able to

Hello Robert:

Thanks  for your  answer, however  as I posted at the begging  tested  it with 
Flink as well.
There's  more interesting  as Reshuffle is a native (balance) operation but 
still doesn't  seem to progress with streaming.
Where you able  to run int successfully  with the expected  behavior? Here 
running  latest 2.54.0 on  Windows.

Regards
JP


From: Robert Bradshaw mailto:rober...@google.com>>
Sent: Friday, March 8, 2024 6:49 PM
To: user@beam.apache.org 
mailto:user@beam.apache.org>>
Cc: XQ Hu mailto:x...@google.com>>; Puertos tavares, Jose J 
(Canada) 
mailto:jose_j_puertos_tava...@homedepot.com>>
Subject: [EXTERNAL] Re: [Question] Python Streaming Pipeline Support

The 

Re: [Question] Python Streaming Pipeline Support

2024-03-08 Thread Puertos tavares, Jose J (Canada) via user
Hello Robert:

Thanks  for your  answer, however  as I posted at the begging  tested  it with 
Flink as well.
There's  more interesting  as Reshuffle is a native (balance) operation but 
still doesn't  seem to progress with streaming.
Where you able  to run int successfully  with the expected  behavior? Here 
running  latest 2.54.0 on  Windows.

Regards
JP


From: Robert Bradshaw 
Sent: Friday, March 8, 2024 6:49 PM
To: user@beam.apache.org 
Cc: XQ Hu ; Puertos tavares, Jose J (Canada) 

Subject: [EXTERNAL] Re: [Question] Python Streaming Pipeline Support

The Python Local Runner has limited support for streaming pipelines. For the 
time being would recommend using Dataflow or Flink (the latter can be run 
locally) to try out streaming pipelines. On Fri, Mar 8, 2024 at 2: 11 PM 
Puertos tavares,


The Python Local Runner has limited support for streaming pipelines.
For the time being would recommend using Dataflow or Flink (the latter
can be run locally) to try out streaming pipelines.

On Fri, Mar 8, 2024 at 2:11 PM Puertos tavares, Jose J (Canada) via
user  wrote:
>
> Hello  Hu:
>
>
>
> Not really.  This one as you have coded it finishes as per   
> stop_timestamp=time.time() + 16 and after it finish emitting then everything 
> else gets output and the pipeline in batch mode terminates.
>
>
>
> You can rule out STDOUT issues and  confirm this behavior as putting a ParDo  
> with something that would throw an exception after the GroupBy  or write 
> temporary files/make HTTP requests. This ParDO won’t be executed until your 
> PeriodImpulse terminates (you can extend it to +60  and see this is not being 
> trigger on your 4 second window, but until it stops generating)
>
>
>
> I am looking for something that is really streaming and executes constantly 
> and that in this case , every 4 seconds the window would process the elements 
> in the window and wait for the next window to accumulate.
>
>
>
> Regards,
>
> JP
>
>
>
>
>
>
>
>
>
>
> INTERNAL USE
>
> From: XQ Hu 
> Sent: Friday, March 8, 2024 3:51 PM
> To: user@beam.apache.org
> Cc: Puertos tavares, Jose J (Canada) 
> Subject: [EXTERNAL] Re: [Question] Python Streaming Pipeline Support
>
>
>
> Is this what you are looking for? import random import time import 
> apache_beam as beam from apache_beam. transforms import trigger, window from 
> apache_beam. transforms. periodicsequence import PeriodicImpulse from 
> apache_beam. utils. timestamp import
>
> Is this what you are looking for?
>
>
>
> import random
> import time
>
> import apache_beam as beam
> from apache_beam.transforms import trigger, window
> from apache_beam.transforms.periodicsequence import PeriodicImpulse
> from apache_beam.utils.timestamp import Timestamp
>
> with beam.Pipeline() as p:
> input = (
> p
> | PeriodicImpulse(
> start_timestamp=time.time(),
> stop_timestamp=time.time() + 16,
> fire_interval=1,
> apply_windowing=False,
> )
> | beam.Map(lambda x: random.random())
> | beam.WindowInto(window.FixedWindows(4))
> | beam.GroupBy()
> | "Print Windows"
> >> beam.transforms.util.LogElements(with_timestamp=True, 
> with_window=True)
> )
>
>
>
> On Fri, Mar 8, 2024 at 6:48 AM Puertos tavares, Jose J (Canada) via user 
>  wrote:
>
> Hello Beam Users!
>
>
>
> I was looking into a simple example in Python to have an unbound (--streaming 
> flag ) pipeline that generated random numbers , applied a Fixed Window (let’s 
> say 5 seconds) and then applies a group by operation ( reshuffle) and print 
> the result just to check.
>
>
>
> I notice that this seems to work as long as there is no grouping operation 
> (reshuffle, groupBy ,etc. ) that would leverage the windowing semantics.
>
>
>
> #Get Parameters from Command Line for the Pipeline
>
> known_args, pipeline_options = parser.parse_known_args(argv)
>
> pipeline_options = PipelineOptions(flags=argv)
>
>
>
> #Create pipeline
>
> p = beam.Pipeline(options=pipeline_options)
>
>
>
>
>
> #Execute Pipeline
>
> (p | "Start pipeline " >> beam.Create([0])
>
> | "Get values"  >> beam.ParDo(RandomNumberGenerator())
>
> | 'Applied fixed windows ' >> beam.WindowInto( window.FixedWindows(1*5) )
>
> | 'Reshuffle ' >> beam.Reshuffle()
>
> |  "Print" >> beam.Map(lambda x: print ("{} - {} ".format(os.getpid(), x) 
> ,flush=True ) )
>
> )
>
>
>
> result = p.run()
>
> result.wait_until_finish()
>
>
>
>
>
> Even thought the  Random Generator is unbound and tagged as so with the 
> decorator, it seems to stuck, if I make that step finite (i.e. adding a 
> counter and exiting) then the code works in regular batch mode.
>
>
>
> # 
> =
>
> # Class for Splittable Do  Random Generatered numbers
>
> # 
> =
>
>
>

Re: Fails to run two multi-language pipelines locally?

2024-03-08 Thread Robert Bradshaw via user
The way that cross-language pipelines work is that each transform has
an attached "environment" in which its workers should be instantiated.
By default these are identified as docker images + a possible set of
dependencies. Transforms with the same environment can be colocated.

There is a tension between having large environments that can be
shared and more targeted ones that have fewer dependencies.

By default the environment for SQL relies on Beam's jar built as
"sdks:java:extensions:sql:expansion-service:shadowJar" and the
environment for Kafka relies on Beam's jar for
"sdks:java:io:expansion-service:shadowJar" To use a single jar you
could create a jar with both of these (plus whatever else you need)
and use that (the easiest would be to pass the flag

--beamServices='{
  "sdks:java:extensions:sql:expansion-service:shadowJar": "/path/to/uber.jar",
  "sdks:java:io:expansion-service:shadowJar": "/path/to/uber.jar"
}'

That being said, it should work just fine with distinct jars on an
arbitrary setup as well (and the fact that it works locally hints to
something being up with that setup). Not sure what your flink master
configuration is like, but maybe it's a docker-in-docker issue?

On Wed, Mar 6, 2024 at 8:30 PM Jaehyeon Kim  wrote:
>
> It may be a configuration issue. It works if I don't specify flink_master, 
> which uses an embedded flink cluster.
>
> On Thu, 7 Mar 2024 at 12:47, Jaehyeon Kim  wrote:
>>
>> I works fine if I only use Kafka read/write as I only see a single container 
>> - two transforms (read and write) but a single container.
>>
>> If I add SqlTransform, however, another container is created and it begins 
>> to create an error. My speculation is the containers don't recognise each 
>> other and get killed by the Flink task manager. I see containers are kept 
>> created and killed.
>>
>> Does every multi-language pipeline runs in a separate container?
>>
>> On Thu, 7 Mar 2024, 12:35 pm Robert Bradshaw via user, 
>>  wrote:
>>>
>>> Oh, sorry, I didn't see that.
>>>
>>> I would look earlier in the logs and see why it failed to bring up the
>>> containers (or, if they started, look in the container logs to see why
>>> they died).
>>>
>>> On Wed, Mar 6, 2024 at 5:28 PM Jaehyeon Kim  wrote:
>>> >
>>> > I am not using the python local runner but the flink runner. A flink 
>>> > cluster is started locally.
>>> >
>>> > On Thu, 7 Mar 2024 at 12:13, Robert Bradshaw via user 
>>> >  wrote:
>>> >>
>>> >> Streaming portable pipelines are not yet supported on the Python local 
>>> >> runner.
>>> >>
>>> >> On Wed, Mar 6, 2024 at 5:03 PM Jaehyeon Kim  wrote:
>>> >> >
>>> >> > Hello,
>>> >> >
>>> >> > I use the python SDK and my pipeline reads messages from Kafka and 
>>> >> > transforms via SQL. I see two containers are created but it seems that 
>>> >> > they don't communicate with each other so that the Flink task manager 
>>> >> > keeps killing the containers. The Flink cluster runs locally. Is there 
>>> >> > a way to run two multi-language pipelines (running on Docker) 
>>> >> > communicating with each other?
>>> >> >
>>> >> > Cheers,
>>> >> > Jaehyeon
>>> >> >
>>> >> >
>>> >> >
>>> >> > def run():
>>> >> > parser = argparse.ArgumentParser(
>>> >> > description="Process statistics by user from website visit 
>>> >> > event"
>>> >> > )
>>> >> > parser.add_argument(
>>> >> > "--inputs",
>>> >> > default="inputs",
>>> >> > help="Specify folder name that event records are saved",
>>> >> > )
>>> >> > parser.add_argument(
>>> >> > "--runner", default="FlinkRunner", help="Specify Apache Beam 
>>> >> > Runner"
>>> >> > )
>>> >> > opts = parser.parse_args()
>>> >> >
>>> >> > options = PipelineOptions()
>>> >> > pipeline_opts = {
>>> >> > "runner": opts.runner,
>>> >> > "flink_master": "localhost:8081",
>>> >> > "job_name": "traffic-agg-sql",
>>> >> > "environment_type": "LOOPBACK",
>>> >> > "streaming": True,
>>> >> > "parallelism": 3,
>>> >> > "experiments": [
>>> >> > "use_deprecated_read"
>>> >> > ],  ## https://github.com/apache/beam/issues/20979
>>> >> > "checkpointing_interval": "6",
>>> >> > }
>>> >> > options = PipelineOptions([], **pipeline_opts)
>>> >> > # Required, else it will complain that when importing worker 
>>> >> > functions
>>> >> > options.view_as(SetupOptions).save_main_session = True
>>> >> >
>>> >> > query = """
>>> >> > WITH cte AS (
>>> >> > SELECT id, CAST(event_datetime AS TIMESTAMP) AS ts
>>> >> > FROM PCOLLECTION
>>> >> > )
>>> >> > SELECT
>>> >> > CAST(TUMBLE_START(ts, INTERVAL '10' SECOND) AS VARCHAR) AS 
>>> >> > window_start,
>>> >> > CAST(TUMBLE_END(ts, INTERVAL '10' SECOND) AS VARCHAR) AS 
>>> >> > window_end,
>>> >> > COUNT(*) AS page_view
>>> >> > FROM cte
>>> >> > GROUP BY
>>> >> > TUMBLE(ts, INTERVAL '10' SECOND), 

Re: [Question] Python Streaming Pipeline Support

2024-03-08 Thread Robert Bradshaw via user
The Python Local Runner has limited support for streaming pipelines.
For the time being would recommend using Dataflow or Flink (the latter
can be run locally) to try out streaming pipelines.

On Fri, Mar 8, 2024 at 2:11 PM Puertos tavares, Jose J (Canada) via
user  wrote:
>
> Hello  Hu:
>
>
>
> Not really.  This one as you have coded it finishes as per   
> stop_timestamp=time.time() + 16 and after it finish emitting then everything 
> else gets output and the pipeline in batch mode terminates.
>
>
>
> You can rule out STDOUT issues and  confirm this behavior as putting a ParDo  
> with something that would throw an exception after the GroupBy  or write 
> temporary files/make HTTP requests. This ParDO won’t be executed until your 
> PeriodImpulse terminates (you can extend it to +60  and see this is not being 
> trigger on your 4 second window, but until it stops generating)
>
>
>
> I am looking for something that is really streaming and executes constantly 
> and that in this case , every 4 seconds the window would process the elements 
> in the window and wait for the next window to accumulate.
>
>
>
> Regards,
>
> JP
>
>
>
>
>
>
>
>
>
>
> INTERNAL USE
>
> From: XQ Hu 
> Sent: Friday, March 8, 2024 3:51 PM
> To: user@beam.apache.org
> Cc: Puertos tavares, Jose J (Canada) 
> Subject: [EXTERNAL] Re: [Question] Python Streaming Pipeline Support
>
>
>
> Is this what you are looking for? import random import time import 
> apache_beam as beam from apache_beam. transforms import trigger, window from 
> apache_beam. transforms. periodicsequence import PeriodicImpulse from 
> apache_beam. utils. timestamp import
>
> Is this what you are looking for?
>
>
>
> import random
> import time
>
> import apache_beam as beam
> from apache_beam.transforms import trigger, window
> from apache_beam.transforms.periodicsequence import PeriodicImpulse
> from apache_beam.utils.timestamp import Timestamp
>
> with beam.Pipeline() as p:
> input = (
> p
> | PeriodicImpulse(
> start_timestamp=time.time(),
> stop_timestamp=time.time() + 16,
> fire_interval=1,
> apply_windowing=False,
> )
> | beam.Map(lambda x: random.random())
> | beam.WindowInto(window.FixedWindows(4))
> | beam.GroupBy()
> | "Print Windows"
> >> beam.transforms.util.LogElements(with_timestamp=True, 
> with_window=True)
> )
>
>
>
> On Fri, Mar 8, 2024 at 6:48 AM Puertos tavares, Jose J (Canada) via user 
>  wrote:
>
> Hello Beam Users!
>
>
>
> I was looking into a simple example in Python to have an unbound (--streaming 
> flag ) pipeline that generated random numbers , applied a Fixed Window (let’s 
> say 5 seconds) and then applies a group by operation ( reshuffle) and print 
> the result just to check.
>
>
>
> I notice that this seems to work as long as there is no grouping operation 
> (reshuffle, groupBy ,etc. ) that would leverage the windowing semantics.
>
>
>
> #Get Parameters from Command Line for the Pipeline
>
> known_args, pipeline_options = parser.parse_known_args(argv)
>
> pipeline_options = PipelineOptions(flags=argv)
>
>
>
> #Create pipeline
>
> p = beam.Pipeline(options=pipeline_options)
>
>
>
>
>
> #Execute Pipeline
>
> (p | "Start pipeline " >> beam.Create([0])
>
> | "Get values"  >> beam.ParDo(RandomNumberGenerator())
>
> | 'Applied fixed windows ' >> beam.WindowInto( window.FixedWindows(1*5) )
>
> | 'Reshuffle ' >> beam.Reshuffle()
>
> |  "Print" >> beam.Map(lambda x: print ("{} - {} ".format(os.getpid(), x) 
> ,flush=True ) )
>
> )
>
>
>
> result = p.run()
>
> result.wait_until_finish()
>
>
>
>
>
> Even thought the  Random Generator is unbound and tagged as so with the 
> decorator, it seems to stuck, if I make that step finite (i.e. adding a 
> counter and exiting) then the code works in regular batch mode.
>
>
>
> # 
> =
>
> # Class for Splittable Do  Random Generatered numbers
>
> # 
> =
>
>
>
> @beam.transforms.core.DoFn.unbounded_per_element()
>
> class RandomNumberGenerator(beam.DoFn):
>
>
>
> @beam.transforms.core.DoFn.unbounded_per_element()
>
> def process(self, element ):
>
> import random
>
> import time
>
>
>
> counter=0
>
>
>
>
>
> while True:
>
>
>
> #if counter>5:
>
> #break
>
> nmb = random.randint(0, 1000)
>
> wait = random.randint(0, 5)
>
> rnow = time.time()
>
>
>
>
>
> print("Randy random", nmb)
>
>
>
> yield beam.window.TimestampedValue(nmb, rnow)
>
> time.sleep(wait)
>
> counter+=1
>
>
>
> I have tried to implement as per documentation the tracker and watermark, but 
> it seems that none of that seems to work either for the DirectRunner or 
> 

Re: [Question] ReadFromKafka can't get messages.

2024-03-08 Thread Jaehyeon Kim
I've been teaching myself Beam and here is an example pipeline that uses
Kafka IO (read and write). I hope it helps.

*Prerequisites*
1. Kafka runs on Docker and its external listener is exposed on port 29092
(i.e. its bootstrap server address can be specified as localhost:29092)
2. The following entry exists in /etc/hosts and the multi language pipeline
(Kafk IO) runs in a Docker and the container runs using the Docker host
network (i.e. --network host)

*127.0.0.1   host.docker.internal*

*Key notes*
1. Use FlinkRunner with or without specifying *flink_master*. If not
specified, an embedded Flink cluster is used. Otherwise add the
*flink_master* option (eg "flink_master": "localhost:8081"). (Check
--use_own argument)
2. The Kafka bootstrap server address should be specified as
*host.docker.internal:29092* (change port is different).
3. Add *use_deprecated_read* as indicated in an earlier reply. Otherwise it
may get stuck.

import os
import argparse
import json
import logging
import typing

import apache_beam as beam
from apache_beam.io import kafka
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


def decode_message(kafka_kv: tuple):
# Incoming Kafka records must have a key associated.
# Otherwise, Beam throws an exception with null keys.
#   Example: (b'key', b'value')
return kafka_kv[1].decode("utf-8")


def create_message(element: dict):
key = {"id": element["id"]}
print(key)
return json.dumps(key).encode("utf-8"), json.dumps(element).encode(
"utf-8")


def parse_json(element: str):
return json.loads(element)


def run():
parser = argparse.ArgumentParser(
description="Kafka IO example"
)
parser.add_argument(
"--runner", default="FlinkRunner", help="Specify Apache Beam Runner"
)
parser.add_argument(
"--use_own",
action="store_true",
default="Flag to indicate whether to use an own local cluster",
)
opts = parser.parse_args()

pipeline_opts = {
"runner": opts.runner,
"job_name": "kafka-io",
"environment_type": "LOOPBACK",
"streaming": True,
"parallelism": 3,
"experiments": [
"use_deprecated_read"
],  ## https://github.com/apache/beam/issues/20979
"checkpointing_interval": "6",
}
if opts.use_own is True:
pipeline_opts = {**pipeline_opts, **{"flink_master":
"localhost:8081"}}
print(pipeline_opts)
options = PipelineOptions([], **pipeline_opts)
# Required, else it will complain that when importing worker functions
options.view_as(SetupOptions).save_main_session = True

p = beam.Pipeline(options=options)
(
p
| "Read from Kafka"
>> kafka.ReadFromKafka(
consumer_config={
"bootstrap.servers": os.getenv(
"BOOTSTRAP_SERVERS",
"host.docker.internal:29092",
),
"auto.offset.reset": "earliest",
# "enable.auto.commit": "true",
"group.id": "kafka-io",
},
topics=["input-topic"],
)
| "Decode messages" >> beam.Map(decode_message)
| "Parse elements" >> beam.Map(parse_json)
| "Create messages"
>> beam.Map(create_message).with_output_types(typing.Tuple[bytes,
bytes])
| "Write to Kafka"
>> kafka.WriteToKafka(
producer_config={
"bootstrap.servers": os.getenv(
"BOOTSTRAP_SERVERS",
"host.docker.internal:29092",
)
},
topic="output-topic",
)
)

logging.getLogger().setLevel(logging.WARN)
logging.info("Building pipeline ...")

p.run().wait_until_finish()


if __name__ == "__main__":
run()


On Sat, 9 Mar 2024 at 08:12, Chamikara Jayalath via user <
user@beam.apache.org> wrote:

> Which runner are you using ?
>
> There's a known issue with SDFs not triggering for portable runners:
> https://github.com/apache/beam/issues/20979
>
> This should not occur for Dataflow.
> For Flink, you could use the option "--experiments=use_deprecated_read" to
> make it work.
>
> Thanks,
> Cham
>
> On Fri, Mar 8, 2024 at 8:10 AM LDesire  wrote:
>
>> Hello Apache Beam community.
>> I'm asking because while creating a beam pipeline in Python,
>> ReadFromKafka is not working.
>>
>> My code looks like this
>>
>> ```
>> @beam.ptransform_fn
>> def LogElements(input):
>> def log_element(elem):
>> print(elem)
>> return elem
>>
>> return (
>> input | 'DoLog' >> Map(log_element)
>> )
>>
>>
>> if __name__ == '__main__':
>> consumer_config = {
>> 'bootstrap.servers': ''
>> }
>>
>> with beam.Pipeline(options=PipelineOptions(['--streaming'])) as p:
>> (
>> p | ReadFromKafka(consumer_config=consumer_config,
>> 

Re: [python] Side Inputs to CombineGlobally

2024-03-08 Thread Valentyn Tymofieiev via user
It appears to be broken and there is a known issue :
https://github.com/apache/beam/issues/19851 .

On Fri, Mar 8, 2024 at 8:40 AM Joey Tran  wrote:

> In the python SDK, should we be able to supply side inputs to
> CombineGlobally?
>
> I created an example here that fails at the pipeline translation stage
> https://play.beam.apache.org/?sdk=python=vjM_k2TvNrf
>
> It fails with
> ```
>   File
> "/Users/jtran/builds/2024-2/build/internal/lib/python3.11/site-packages/apache_beam/runners/portability/fn_api_runner/translations.py",
> line 1367, in lift_combiners
> expansion = lifted_stages if can_lift(transform) else unlifted_stages
>  ^^^
>   File
> "/Users/jtran/builds/2024-2/build/internal/lib/python3.11/site-packages/apache_beam/runners/portability/fn_api_runner/translations.py",
> line 1239, in can_lift
> context.components.pcollections[only_element(
> ^
>   File
> "/Users/jtran/builds/2024-2/build/internal/lib/python3.11/site-packages/apache_beam/runners/portability/fn_api_runner/translations.py",
> line 2082, in only_element
> element, = iterable
> 
> ValueError: too many values to unpack (expected 1)
> ```
>


RE: [Question] Python Streaming Pipeline Support

2024-03-08 Thread Puertos tavares, Jose J (Canada) via user
Hello  Hu:

Not really.  This one as you have coded it finishes as per   
stop_timestamp=time.time() + 16 and after it finish emitting then everything 
else gets output and the pipeline in batch mode terminates.

You can rule out STDOUT issues and  confirm this behavior as putting a ParDo  
with something that would throw an exception after the GroupBy  or write 
temporary files/make HTTP requests. This ParDO won’t be executed until your 
PeriodImpulse terminates (you can extend it to +60  and see this is not being 
trigger on your 4 second window, but until it stops generating)

I am looking for something that is really streaming and executes constantly and 
that in this case , every 4 seconds the window would process the elements in 
the window and wait for the next window to accumulate.

Regards,
JP






INTERNAL USE

From: XQ Hu 
Sent: Friday, March 8, 2024 3:51 PM
To: user@beam.apache.org
Cc: Puertos tavares, Jose J (Canada) 
Subject: [EXTERNAL] Re: [Question] Python Streaming Pipeline Support

Is this what you are looking for? import random import time import apache_beam 
as beam from apache_beam. transforms import trigger, window from apache_beam. 
transforms. periodicsequence import PeriodicImpulse from apache_beam. utils. 
timestamp import

Is this what you are looking for?

import random
import time

import apache_beam as beam
from apache_beam.transforms import trigger, window
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.utils.timestamp import Timestamp

with beam.Pipeline() as p:
input = (
p
| PeriodicImpulse(
start_timestamp=time.time(),
stop_timestamp=time.time() + 16,
fire_interval=1,
apply_windowing=False,
)
| beam.Map(lambda x: random.random())
| beam.WindowInto(window.FixedWindows(4))
| beam.GroupBy()
| "Print Windows"
>> beam.transforms.util.LogElements(with_timestamp=True, 
with_window=True)
)

On Fri, Mar 8, 2024 at 6:48 AM Puertos tavares, Jose J (Canada) via user 
mailto:user@beam.apache.org>> wrote:
Hello Beam Users!

I was looking into a simple example in Python to have an unbound (--streaming 
flag ) pipeline that generated random numbers , applied a Fixed Window (let’s 
say 5 seconds) and then applies a group by operation ( reshuffle) and print the 
result just to check.

I notice that this seems to work as long as there is no grouping operation 
(reshuffle, groupBy ,etc. ) that would leverage the windowing semantics.

#Get Parameters from Command Line for the Pipeline
known_args, pipeline_options = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(flags=argv)

#Create pipeline
p = beam.Pipeline(options=pipeline_options)


#Execute Pipeline
(p | "Start pipeline " >> beam.Create([0])
| "Get values"  >> beam.ParDo(RandomNumberGenerator())
| 'Applied fixed windows ' >> beam.WindowInto( window.FixedWindows(1*5) )
| 'Reshuffle ' >> beam.Reshuffle()
|  "Print" >> beam.Map(lambda x: print ("{} - {} ".format(os.getpid(), x) 
,flush=True ) )
)

result = p.run()
result.wait_until_finish()


Even thought the  Random Generator is unbound and tagged as so with the 
decorator, it seems to stuck, if I make that step finite (i.e. adding a counter 
and exiting) then the code works in regular batch mode.

# =
# Class for Splittable Do  Random Generatered numbers
# =

@beam.transforms.core.DoFn.unbounded_per_element()
class RandomNumberGenerator(beam.DoFn):

@beam.transforms.core.DoFn.unbounded_per_element()
def process(self, element ):
import random
import time

counter=0


while True:

#if counter>5:
#break
nmb = random.randint(0, 1000)
wait = random.randint(0, 5)
rnow = time.time()


print("Randy random", nmb)

yield beam.window.TimestampedValue(nmb, rnow)
time.sleep(wait)
counter+=1

I have tried to implement as per documentation the tracker and watermark, but 
it seems that none of that seems to work either for the DirectRunner or 
FlinkRunner  (even there where reshuffle is not a custom operation but a vertex 
between the different ParDos). It seems to just stuck.

I event tried using the native PeriodicImpusle 
[beam.apache.org]
 as to factor out any of my implementation on it, however I still got the same 
result of it being ‘stuck’ on the GroupBy/Reshuffle operation.

Re: [Question] ReadFromKafka can't get messages.

2024-03-08 Thread Chamikara Jayalath via user
Which runner are you using ?

There's a known issue with SDFs not triggering for portable runners:
https://github.com/apache/beam/issues/20979

This should not occur for Dataflow.
For Flink, you could use the option "--experiments=use_deprecated_read" to
make it work.

Thanks,
Cham

On Fri, Mar 8, 2024 at 8:10 AM LDesire  wrote:

> Hello Apache Beam community.
> I'm asking because while creating a beam pipeline in Python, ReadFromKafka
> is not working.
>
> My code looks like this
>
> ```
> @beam.ptransform_fn
> def LogElements(input):
> def log_element(elem):
> print(elem)
> return elem
>
> return (
> input | 'DoLog' >> Map(log_element)
> )
>
>
> if __name__ == '__main__':
> consumer_config = {
> 'bootstrap.servers': ''
> }
>
> with beam.Pipeline(options=PipelineOptions(['--streaming'])) as p:
> (
> p | ReadFromKafka(consumer_config=consumer_config,
> topics=['input_topic'])
> | "ToLines" >> beam.Map(lambda x: "%s %s" %
> (x[0].decode("utf-8"), x[1].decode("utf-8")))
> | 'Logging' >> LogElements()
> )
> ```
>
> This is a simple pipeline that simply subscribes to a topic in Kafka and
> outputs to the console.
>
> I've seen in the documentation that using ReadFromKafka in Python creates
> an external java processor.
> We also confirmed that it creates a process in VisualVM just fine.
>
> However, I'm not sure why I'm not getting messages from Kafka.
>
> * I checked the `_really_start_process` method of the `SubprocessServer`
> class in subprocess_server.py to get the logs from the external java
> process.
>
>
>
>


Re: How to change SQL dialect on beam_sql magic?

2024-03-08 Thread XQ Hu via user
I do not think the dialect argument is exposed here:
https://github.com/apache/beam/blob/a391198b5a632238dc4a9298e635bb5eb0f433df/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py#L293

Two options:
1) create a feature request and PR to add that
2) Switch to SqlTransform

On Mon, Mar 4, 2024 at 12:14 AM Jaehyeon Kim  wrote:

> Hello,
>
> beam_sql magic doesn't seem to have an option to specify an SQL dialect
> while the underlying SqlTransform has the dialect argument. Is there a way
> to specify an SQL dialect on a notebook?
>
> Cheers,
> Jaehyeon
>
> [image: image.png]
>


Re: [Question] Python Streaming Pipeline Support

2024-03-08 Thread XQ Hu via user
Is this what you are looking for?

import random
import time

import apache_beam as beam
from apache_beam.transforms import trigger, window
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.utils.timestamp import Timestamp

with beam.Pipeline() as p:
input = (
p
| PeriodicImpulse(
start_timestamp=time.time(),
stop_timestamp=time.time() + 16,
fire_interval=1,
apply_windowing=False,
)
| beam.Map(lambda x: random.random())
| beam.WindowInto(window.FixedWindows(4))
| beam.GroupBy()
| "Print Windows"
>> beam.transforms.util.LogElements(with_timestamp=True,
with_window=True)
)

On Fri, Mar 8, 2024 at 6:48 AM Puertos tavares, Jose J (Canada) via user <
user@beam.apache.org> wrote:

> *Hello Beam Users!*
>
>
>
> I was looking into a simple example in Python to have an unbound
> (--streaming flag ) pipeline that generated random numbers , applied a
> Fixed Window (let’s say 5 seconds) and then applies a group by operation (
> reshuffle) and print the result just to check.
>
>
>
> I notice that this seems to work as long as there is no grouping operation
> (reshuffle, groupBy ,etc. ) that would leverage the windowing semantics.
>
>
>
> *#Get Parameters from Command Line for the Pipeline*
>
> known_args, pipeline_options = parser.parse_known_args(argv)
>
> pipeline_options = PipelineOptions(flags=argv)
>
>
>
> *#Create pipeline*
>
> p = beam.Pipeline(options=pipeline_options)
>
>
>
>
>
> *#Execute Pipeline*
>
> (p | "Start pipeline " >> beam.Create([0])
>
> | "Get values"  >> beam.ParDo(RandomNumberGenerator())
>
> | 'Applied fixed windows ' >> beam.WindowInto(
> window.FixedWindows(1*5) )
>
> | 'Reshuffle ' >> beam.Reshuffle()
>
> |  "Print" >> beam.Map(lambda x: print ("{} - {} ".format(os.getpid(),
> x) ,flush=True ) )
>
> )
>
>
>
> result = p.run()
>
> result.wait_until_finish()
>
>
>
>
>
> Even thought the  Random Generator is unbound and tagged as so with the
> decorator, it seems to stuck, if I make that step finite (i.e. adding a
> counter and exiting) then the code works in regular batch mode.
>
>
>
> #
> =
>
> # Class for Splittable Do  Random Generatered numbers
>
> #
> =
>
>
>
> @beam.transforms.core.DoFn.unbounded_per_element()
>
> class RandomNumberGenerator(beam.DoFn):
>
>
>
> @beam.transforms.core.DoFn.unbounded_per_element()
>
> def process(self, element ):
>
> import random
>
> import time
>
>
>
> counter=0
>
>
>
>
>
> while True:
>
>
>
> #if counter>5:
>
> #break
>
> nmb = random.randint(0, 1000)
>
> wait = random.randint(0, 5)
>
> rnow = time.time()
>
>
>
>
>
> print("Randy random", nmb)
>
>
>
> yield beam.window.TimestampedValue(nmb, rnow)
>
> time.sleep(wait)
>
> counter+=1
>
>
>
> I have tried to implement as per documentation the tracker and watermark,
> but it seems that none of that seems to work either for the *DirectRunner
> or FlinkRunner*  (even there where reshuffle is not a custom operation
> but a vertex between the different ParDos). It seems to just stuck.
>
>
>
> I event tried using the native PeriodicImpusle
> 
> as to factor out any of my implementation on it, however I still got the
> same result of it being ‘stuck’ on the GroupBy/Reshuffle operation.
>
>
>
> In the past I have created with the Java SDK a Unbound Source (now
> obsoleted it seems according to doc)   streaming pipelines, however I
> noticed that  most of the unbound python readers like Kakfa
> 
>  and PubSub
> 
> use ExternalTransforms behind the scenes so I am starting to wonder if such
> unbound sources are supported at all natively in Python.
>
>
>
> I have done some Internet search and even tried  LLMs to get to have a
> suggestion but I don’t seem to be successful in getting a clear answer on
> how to achieve this in Python or if this is even possible and after
> spending a couple days I figure I could ask the beam team and hear your
> thoughts about it and if you can reference me to any sample that might work
> so I can analyze it forward to understand what is missing would be greatly
> appreciated.
>
>
>
>
>
>
>
> Regards,
>
> *JP – A fellow Apache Beam enthusiast *
>
> --
>
> The information in this 

[python] Side Inputs to CombineGlobally

2024-03-08 Thread Joey Tran
In the python SDK, should we be able to supply side inputs to
CombineGlobally?

I created an example here that fails at the pipeline translation stage
https://play.beam.apache.org/?sdk=python=vjM_k2TvNrf

It fails with
```
  File
"/Users/jtran/builds/2024-2/build/internal/lib/python3.11/site-packages/apache_beam/runners/portability/fn_api_runner/translations.py",
line 1367, in lift_combiners
expansion = lifted_stages if can_lift(transform) else unlifted_stages
 ^^^
  File
"/Users/jtran/builds/2024-2/build/internal/lib/python3.11/site-packages/apache_beam/runners/portability/fn_api_runner/translations.py",
line 1239, in can_lift
context.components.pcollections[only_element(
^
  File
"/Users/jtran/builds/2024-2/build/internal/lib/python3.11/site-packages/apache_beam/runners/portability/fn_api_runner/translations.py",
line 2082, in only_element
element, = iterable

ValueError: too many values to unpack (expected 1)
```


[Question] ReadFromKafka can't get messages.

2024-03-08 Thread LDesire
Hello Apache Beam community.
I'm asking because while creating a beam pipeline in Python, ReadFromKafka is 
not working.

My code looks like this

```
@beam.ptransform_fn
def LogElements(input):
def log_element(elem):
print(elem)
return elem

return (
input | 'DoLog' >> Map(log_element)
)


if __name__ == '__main__':
consumer_config = {
'bootstrap.servers': ''
}

with beam.Pipeline(options=PipelineOptions(['--streaming'])) as p:
(
p | ReadFromKafka(consumer_config=consumer_config, 
topics=['input_topic'])
| "ToLines" >> beam.Map(lambda x: "%s %s" % 
(x[0].decode("utf-8"), x[1].decode("utf-8")))
| 'Logging' >> LogElements()
)
```

This is a simple pipeline that simply subscribes to a topic in Kafka and 
outputs to the console.

I've seen in the documentation that using ReadFromKafka in Python creates an 
external java processor.
We also confirmed that it creates a process in VisualVM just fine.

However, I'm not sure why I'm not getting messages from Kafka.

* I checked the `_really_start_process` method of the `SubprocessServer` class 
in subprocess_server.py to get the logs from the external java process.





[Question] Python Streaming Pipeline Support

2024-03-08 Thread Puertos tavares, Jose J (Canada) via user
Hello Beam Users!

I was looking into a simple example in Python to have an unbound (--streaming 
flag ) pipeline that generated random numbers , applied a Fixed Window (let's 
say 5 seconds) and then applies a group by operation ( reshuffle) and print the 
result just to check.

I notice that this seems to work as long as there is no grouping operation 
(reshuffle, groupBy ,etc. ) that would leverage the windowing semantics.

#Get Parameters from Command Line for the Pipeline
known_args, pipeline_options = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(flags=argv)

#Create pipeline
p = beam.Pipeline(options=pipeline_options)


#Execute Pipeline
(p | "Start pipeline " >> beam.Create([0])
| "Get values"  >> beam.ParDo(RandomNumberGenerator())
| 'Applied fixed windows ' >> beam.WindowInto( window.FixedWindows(1*5) )
| 'Reshuffle ' >> beam.Reshuffle()
|  "Print" >> beam.Map(lambda x: print ("{} - {} ".format(os.getpid(), x) 
,flush=True ) )
)

result = p.run()
result.wait_until_finish()


Even thought the  Random Generator is unbound and tagged as so with the 
decorator, it seems to stuck, if I make that step finite (i.e. adding a counter 
and exiting) then the code works in regular batch mode.

# =
# Class for Splittable Do  Random Generatered numbers
# =

@beam.transforms.core.DoFn.unbounded_per_element()
class RandomNumberGenerator(beam.DoFn):

@beam.transforms.core.DoFn.unbounded_per_element()
def process(self, element ):
import random
import time

counter=0


while True:

#if counter>5:
#break
nmb = random.randint(0, 1000)
wait = random.randint(0, 5)
rnow = time.time()


print("Randy random", nmb)

yield beam.window.TimestampedValue(nmb, rnow)
time.sleep(wait)
counter+=1

I have tried to implement as per documentation the tracker and watermark, but 
it seems that none of that seems to work either for the DirectRunner or 
FlinkRunner  (even there where reshuffle is not a custom operation but a vertex 
between the different ParDos). It seems to just stuck.

I event tried using the native 
PeriodicImpusle
 as to factor out any of my implementation on it, however I still got the same 
result of it being 'stuck' on the GroupBy/Reshuffle operation.

In the past I have created with the Java SDK a Unbound Source (now obsoleted it 
seems according to doc)   streaming pipelines, however I noticed that  most of 
the unbound python readers like 
Kakfa
  and 
PubSub
 use ExternalTransforms behind the scenes so I am starting to wonder if such 
unbound sources are supported at all natively in Python.

I have done some Internet search and even tried  LLMs to get to have a 
suggestion but I don't seem to be successful in getting a clear answer on how 
to achieve this in Python or if this is even possible and after spending a 
couple days I figure I could ask the beam team and hear your thoughts about it 
and if you can reference me to any sample that might work so I can analyze it 
forward to understand what is missing would be greatly appreciated.



Regards,
JP - A fellow Apache Beam enthusiast



The information in this Internet Email is confidential and may be legally 
privileged. It is intended solely for the addressee. Access to this Email by 
anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be taken in 
reliance on it, is prohibited and may be unlawful. When addressed to our 
clients any opinions or advice contained in this Email are subject to the terms 
and conditions expressed in any applicable governing The Home Depot terms of 
business or client engagement letter. The Home Depot disclaims all 
responsibility and liability for the accuracy and content of this attachment 
and for any damages or losses arising from any inaccuracies, errors, viruses, 
e.g., worms, trojan horses, etc., or other items of a destructive nature, which 
may be contained in this attachment and shall not be liable for direct, 
indirect, consequential or special damages in connection with this e-mail 
message or its attachment.


INTERNAL USE